Best Python code snippet using playwright-python
test_storage_state.py
Source:test_storage_state.py
1# pylint: disable=protected-access, too-many-public-methods2from math import isclose3from unittest.mock import patch, MagicMock, PropertyMock4import pytest5from pendulum import now, duration6from gsy_e.constants import FLOATING_POINT_TOLERANCE7from gsy_e.models.state import StorageState, ESSEnergyOrigin, EnergyOrigin8SAMPLE_STATE = {9 "pledged_sell_kWh": {},10 "offered_sell_kWh": {},11 "pledged_buy_kWh": {},12 "offered_buy_kWh": {},13 "charge_history": {},14 "charge_history_kWh": {},15 "offered_history": {},16 "used_history": {},17 "energy_to_buy_dict": {},18 "energy_to_sell_dict": {},19 "used_storage": 0.0,20 "battery_energy_per_slot": 0.0,21}22SAMPLE_STATS = {23 "energy_to_sell": 0.0,24 "energy_active_in_bids": 0.0,25 "energy_to_buy": 0.0,26 "energy_active_in_offers": 0.0,27 "free_storage": 0.0,28 "used_storage": 0.0,29}30class TestStorageState:31 """Test the StorageState class."""32 def test_market_cycle_reset_orders(self):33 """Test the market cycle handler of the storage state.34 TODO: Cover the whole module in context of GSY-E:9235 """36 storage_state = StorageState()37 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()38 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]39 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)40 storage_state.offered_buy_kWh[future_time_slots[0]] = 1041 storage_state.offered_sell_kWh[future_time_slots[0]] = 1042 with patch("gsy_e.models.state.GlobalConfig.FUTURE_MARKET_DURATION_HOURS", 5):43 storage_state.market_cycle(past_time_slot, current_time_slot, future_time_slots)44 # The future_time_slots[0] is in the future, so it won't reset45 assert storage_state.offered_buy_kWh[future_time_slots[0]] == 1046 assert storage_state.offered_sell_kWh[future_time_slots[0]] == 1047 storage_state.market_cycle(48 current_time_slot, future_time_slots[0], future_time_slots[1:])49 # The future_time_slots[0] is in the spot market, so it has to reset the orders50 assert storage_state.offered_buy_kWh[future_time_slots[0]] == 051 assert storage_state.offered_sell_kWh[future_time_slots[0]] == 052 def test_market_cycle_update_used_storage(self):53 storage_state = StorageState(initial_soc=100,54 capacity=100)55 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()56 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]57 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)58 storage_state.pledged_sell_kWh[past_time_slot] = 1059 storage_state.pledged_buy_kWh[past_time_slot] = 060 storage_state.market_cycle(past_time_slot, current_time_slot, future_time_slots)61 assert storage_state.used_storage == 9062 storage_state.pledged_sell_kWh[past_time_slot] = 063 storage_state.pledged_buy_kWh[past_time_slot] = 1064 storage_state.market_cycle(past_time_slot, current_time_slot, future_time_slots)65 assert storage_state.used_storage == 10066 def test_market_cycle_ess_share_time_series_dict(self):67 storage_state = StorageState(initial_soc=100,68 capacity=100,69 initial_energy_origin=ESSEnergyOrigin.LOCAL,70 max_abs_battery_power_kW=15)71 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()72 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]73 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)74 energy = 0.375 storage_state.activate(76 slot_length=duration(minutes=15), current_time_slot=current_time_slot)77 storage_state.register_energy_from_one_sided_market_accept_offer(78 energy, past_time_slot, ESSEnergyOrigin.LOCAL)79 storage_state.register_energy_from_one_sided_market_accept_offer(80 energy, past_time_slot, ESSEnergyOrigin.UNKNOWN)81 storage_state.register_energy_from_one_sided_market_accept_offer(82 energy, past_time_slot, ESSEnergyOrigin.UNKNOWN)83 storage_state.market_cycle(past_time_slot, current_time_slot, future_time_slots)84 expected_ess_share_last_market = {85 ESSEnergyOrigin.LOCAL: storage_state.initial_capacity_kWh + energy,86 ESSEnergyOrigin.EXTERNAL: 0.0,87 ESSEnergyOrigin.UNKNOWN: 2 * energy88 }89 assert (storage_state.time_series_ess_share[past_time_slot] ==90 expected_ess_share_last_market)91 def test_market_cycle_calculate_and_update_soc_and_set_offer_history(self):92 storage_state = StorageState(initial_soc=100,93 capacity=100,94 min_allowed_soc=20)95 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()96 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]97 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)98 storage_state.activate(99 slot_length=duration(minutes=15), current_time_slot=current_time_slot)100 storage_state.pledged_sell_kWh[past_time_slot] = 10101 storage_state.market_cycle(past_time_slot, current_time_slot, future_time_slots)102 assert storage_state.charge_history[current_time_slot] != storage_state.initial_soc103 assert (storage_state.charge_history_kWh[current_time_slot] !=104 storage_state.initial_capacity_kWh)105 assert storage_state.offered_history[current_time_slot] != "-"106 @staticmethod107 def _initialize_time_slots():108 past_time_slot = now()109 current_time_slot = past_time_slot.add(minutes=15)110 future_time_slots = [current_time_slot.add(minutes=15),111 current_time_slot.add(minutes=30)]112 return past_time_slot, current_time_slot, future_time_slots113 @staticmethod114 def test_check_state_charge_less_than_min_soc_error():115 storage_state = StorageState(capacity=100,116 min_allowed_soc=50,117 initial_soc=30)118 current_time_slot = now()119 storage_state.add_default_values_to_state_profiles([current_time_slot])120 storage_state.activate(121 slot_length=duration(minutes=15), current_time_slot=current_time_slot)122 with pytest.raises(AssertionError) as error:123 storage_state.check_state(current_time_slot)124 assert "less than min soc" in str(error.value)125 @staticmethod126 def test_check_state_storage_surpasses_capacity_error():127 storage_state = StorageState(capacity=100,128 min_allowed_soc=50,129 initial_soc=110)130 current_time_slot = now()131 storage_state.add_default_values_to_state_profiles([current_time_slot])132 storage_state.activate(133 slot_length=duration(minutes=15), current_time_slot=current_time_slot)134 with pytest.raises(AssertionError) as error:135 storage_state.check_state(current_time_slot)136 assert "surpassed the capacity" in str(error.value)137 @staticmethod138 def test_check_state_offered_and_pledged_energy_in_range():139 storage_state = StorageState(capacity=100,140 min_allowed_soc=20,141 initial_soc=50)142 current_time_slot = now()143 storage_state.add_default_values_to_state_profiles([current_time_slot])144 max_value = storage_state.capacity * (1 - storage_state.min_allowed_soc_ratio)145 storage_state.max_abs_battery_power_kW = max_value * 15146 storage_state.activate(147 slot_length=duration(minutes=15), current_time_slot=current_time_slot)148 def set_attribute_value_and_test(attribute):149 attribute[current_time_slot] = -1150 # check that error is raised for value < 0151 with pytest.raises(AssertionError):152 storage_state.check_state(current_time_slot)153 attribute[current_time_slot] = max_value + 1154 # check that error is raised for value > max_value155 with pytest.raises(AssertionError):156 storage_state.check_state(current_time_slot)157 attribute[current_time_slot] = max_value / 2158 # check that for value in range no error is raised159 try:160 storage_state.check_state(current_time_slot)161 except AssertionError as error:162 raise AssertionError from error163 set_attribute_value_and_test(storage_state.offered_sell_kWh)164 set_attribute_value_and_test(storage_state.pledged_sell_kWh)165 set_attribute_value_and_test(storage_state.offered_buy_kWh)166 set_attribute_value_and_test(storage_state.pledged_buy_kWh)167 @staticmethod168 def _setup_storage_state_for_clamp_energy():169 storage_state = StorageState()170 current_time_slot = now()171 market_slot_list = [current_time_slot,172 current_time_slot + duration(minutes=15),173 current_time_slot + duration(minutes=30),174 current_time_slot + duration(minutes=45)]175 storage_state._current_market_slot = current_time_slot176 storage_state._used_storage = 250.0177 storage_state.capacity = 500.0178 for time_slot in market_slot_list:179 storage_state.offered_sell_kWh[time_slot] = 0.0180 storage_state.pledged_sell_kWh[time_slot] = 0.0181 storage_state.offered_buy_kWh[time_slot] = 0.0182 storage_state.pledged_buy_kWh[time_slot] = 0.0183 return storage_state, market_slot_list184 def test_clamp_energy_to_sell_kWh_only_on_first_slot(self):185 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()186 # Enable battery to sell all its available capacity in one market slot187 storage_state._battery_energy_per_slot = storage_state.capacity188 storage_state._clamp_energy_to_sell_kWh(market_slot_list)189 expected_available_energy = (190 storage_state.used_storage -191 storage_state.min_allowed_soc_ratio * storage_state.capacity)192 expected_energy_to_sell = {193 market_slot_list[0]: expected_available_energy,194 market_slot_list[1]: 0.0,195 market_slot_list[2]: 0.0,196 market_slot_list[3]: 0.0,197 }198 assert expected_energy_to_sell == storage_state.energy_to_sell_dict199 def test_clamp_energy_to_sell_kWh_respects_battery_energy_per_slot(self):200 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()201 storage_state._battery_energy_per_slot = 25.0202 storage_state._clamp_energy_to_sell_kWh(market_slot_list)203 expected_energy_to_sell = {204 market_slot_list[0]: 25.0,205 market_slot_list[1]: 25.0,206 market_slot_list[2]: 25.0,207 market_slot_list[3]: 25.0,208 }209 assert expected_energy_to_sell == storage_state.energy_to_sell_dict210 def test_clamp_energy_to_sell_kWh_respects_energy_offers_trades(self):211 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()212 storage_state._battery_energy_per_slot = 25.0213 for time_slot in market_slot_list:214 storage_state.offered_sell_kWh[time_slot] = 5.0215 storage_state.pledged_sell_kWh[time_slot] = 15.0216 storage_state._clamp_energy_to_sell_kWh(market_slot_list)217 expected_energy_to_sell = {218 market_slot_list[0]: 5.0,219 market_slot_list[1]: 5.0,220 market_slot_list[2]: 5.0,221 market_slot_list[3]: 5.0,222 }223 assert expected_energy_to_sell == storage_state.energy_to_sell_dict224 def test_clamp_energy_to_buy_kWh_only_on_first_slot(self):225 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()226 # Enable battery to sell all its available capacity in one market slot227 storage_state._battery_energy_per_slot = storage_state.capacity228 storage_state._clamp_energy_to_buy_kWh(market_slot_list)229 expected_available_energy = storage_state.capacity - storage_state.used_storage230 expected_energy_to_buy = {231 market_slot_list[0]: expected_available_energy,232 market_slot_list[1]: 0.0,233 market_slot_list[2]: 0.0,234 market_slot_list[3]: 0.0,235 }236 assert expected_energy_to_buy == storage_state.energy_to_buy_dict237 def test_clamp_energy_to_buy_kWh_respects_battery_energy_per_slot(self):238 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()239 storage_state._battery_energy_per_slot = 25.0240 storage_state._clamp_energy_to_buy_kWh(market_slot_list)241 expected_energy_to_buy = {242 market_slot_list[0]: 25.0,243 market_slot_list[1]: 25.0,244 market_slot_list[2]: 25.0,245 market_slot_list[3]: 25.0,246 }247 assert expected_energy_to_buy == storage_state.energy_to_buy_dict248 def test_clamp_energy_to_buy_kWh_respects_energy_offers_trades(self):249 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()250 storage_state._battery_energy_per_slot = 25.0251 for time_slot in market_slot_list:252 storage_state.offered_buy_kWh[time_slot] = 5.0253 storage_state.pledged_buy_kWh[time_slot] = 15.0254 storage_state._clamp_energy_to_buy_kWh(market_slot_list)255 expected_energy_to_buy = {256 market_slot_list[0]: 5.0,257 market_slot_list[1]: 5.0,258 market_slot_list[2]: 5.0,259 market_slot_list[3]: 5.0,260 }261 assert expected_energy_to_buy == storage_state.energy_to_buy_dict262 @pytest.mark.parametrize("is_selling", [True, False])263 def test_clamp_energy_asserts_battery_traded_more_than_energy_per_slot(self, is_selling):264 storage_state, market_slot_list = self._setup_storage_state_for_clamp_energy()265 storage_state._battery_energy_per_slot = 0.0266 for time_slot in market_slot_list:267 storage_state.offered_buy_kWh[time_slot] = 5.0268 storage_state.pledged_buy_kWh[time_slot] = 15.0269 storage_state.offered_sell_kWh[time_slot] = 5.0270 storage_state.pledged_sell_kWh[time_slot] = 15.0271 with pytest.raises(AssertionError):272 if is_selling:273 storage_state._clamp_energy_to_sell_kWh(market_slot_list)274 else:275 storage_state._clamp_energy_to_buy_kWh(market_slot_list)276 @staticmethod277 def test_get_state_keys_in_dict():278 storage_state = StorageState()279 current_time_slot = now()280 storage_state.add_default_values_to_state_profiles([current_time_slot])281 assert set(SAMPLE_STATE.keys()).issubset(storage_state.get_state().keys())282 def test_restore_state(self):283 storage_state = StorageState(initial_soc=100,284 capacity=100,285 min_allowed_soc=20)286 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()287 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]288 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)289 storage_state.activate(290 slot_length=duration(minutes=15), current_time_slot=current_time_slot)291 past_state = storage_state.get_state()292 storage_state.pledged_sell_kWh[current_time_slot] = 50293 modified_state = storage_state.get_state()294 assert past_state != modified_state295 storage_state.restore_state(state_dict=past_state)296 assert storage_state.get_state() == past_state297 @staticmethod298 def test_free_storage_return_float():299 storage_state = StorageState(initial_soc=100,300 capacity=100,301 min_allowed_soc=20)302 current_time_slot = now()303 storage_state.add_default_values_to_state_profiles([current_time_slot])304 assert isinstance(storage_state.free_storage(current_time_slot), float)305 @staticmethod306 def test_activate_convert_energy_to_power():307 with patch("gsy_e.models.state.convert_kW_to_kWh") as mocked_func:308 storage_state = StorageState()309 current_time_slot = now()310 storage_state.activate(311 slot_length=duration(minutes=15), current_time_slot=current_time_slot)312 mocked_func.assert_called()313 def test_add_default_values_to_state_profiles_set_values_for_time_slots(self):314 storage_state = StorageState()315 _, _, future_time_slots = self._initialize_time_slots()316 storage_state.add_default_values_to_state_profiles(future_time_slots)317 def assert_time_slot_in_dict_attribute_with_default_value(attribute, time_slot, default):318 assert time_slot in attribute.keys()319 assert attribute[time_slot] == default320 for time_slot in future_time_slots:321 assert_time_slot_in_dict_attribute_with_default_value(322 storage_state.pledged_sell_kWh, time_slot, 0)323 assert_time_slot_in_dict_attribute_with_default_value(324 storage_state.pledged_buy_kWh, time_slot, 0)325 assert_time_slot_in_dict_attribute_with_default_value(326 storage_state.offered_sell_kWh, time_slot, 0)327 assert_time_slot_in_dict_attribute_with_default_value(328 storage_state.offered_buy_kWh, time_slot, 0)329 assert_time_slot_in_dict_attribute_with_default_value(330 storage_state.charge_history, time_slot, storage_state.initial_soc)331 assert_time_slot_in_dict_attribute_with_default_value(332 storage_state.charge_history_kWh, time_slot, storage_state.initial_capacity_kWh)333 assert_time_slot_in_dict_attribute_with_default_value(334 storage_state.energy_to_sell_dict, time_slot, 0)335 assert_time_slot_in_dict_attribute_with_default_value(336 storage_state.energy_to_buy_dict, time_slot, 0)337 assert_time_slot_in_dict_attribute_with_default_value(338 storage_state.offered_history, time_slot, "-")339 assert_time_slot_in_dict_attribute_with_default_value(340 storage_state.used_history, time_slot, "-")341 assert_time_slot_in_dict_attribute_with_default_value(342 storage_state.time_series_ess_share, time_slot, {ESSEnergyOrigin.UNKNOWN: 0.,343 ESSEnergyOrigin.LOCAL: 0.,344 ESSEnergyOrigin.EXTERNAL: 0.}345 )346 def test_delete_past_state_values_market_slot_not_in_past(self):347 storage_state = StorageState()348 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()349 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]350 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)351 with patch("gsy_e.gsy_e_core.util.ConstSettings.SettlementMarketSettings."352 "ENABLE_SETTLEMENT_MARKETS", True):353 storage_state.delete_past_state_values(current_time_slot)354 assert storage_state.pledged_sell_kWh.get(past_time_slot) is not None355 def test_delete_past_state_values_market_slot_in_past(self):356 storage_state = StorageState()357 past_time_slot, current_time_slot, future_time_slots = self._initialize_time_slots()358 active_market_slot_time_list = [past_time_slot, current_time_slot, *future_time_slots]359 storage_state.add_default_values_to_state_profiles(active_market_slot_time_list)360 with patch("gsy_e.gsy_e_core.util.ConstSettings.SettlementMarketSettings."361 "ENABLE_SETTLEMENT_MARKETS", False):362 storage_state.delete_past_state_values(current_time_slot)363 assert storage_state.pledged_sell_kWh.get(past_time_slot) is None364 def test_register_energy_from_posted_bid_negative_energy_raise_error(self):365 storage_state, current_time_slot = self._setup_registration_test()366 self._assert_negative_energy_raise_error(367 storage_state.register_energy_from_posted_bid, time_slot=current_time_slot)368 def test_register_energy_from_posted_bid_energy_add_energy_to_offered_buy_dict(self):369 storage_state, current_time_slot = self._setup_registration_test()370 bid_energy = 1371 storage_state.register_energy_from_posted_bid(372 energy=bid_energy, time_slot=current_time_slot)373 storage_state.register_energy_from_posted_bid(374 energy=bid_energy, time_slot=current_time_slot)375 assert storage_state.offered_buy_kWh[current_time_slot] == 2 * bid_energy376 def test_register_energy_from_posted_offer_negative_energy_raise_error(self):377 storage_state, current_time_slot = self._setup_registration_test()378 self._assert_negative_energy_raise_error(379 storage_state.register_energy_from_posted_offer,380 time_slot=current_time_slot)381 def test_register_energy_from_posted_offer_energy_add_energy_to_offer_sell_dict(self):382 storage_state, current_time_slot = self._setup_registration_test()383 offered_energy = 1384 storage_state.register_energy_from_posted_offer(385 energy=offered_energy, time_slot=current_time_slot)386 storage_state.register_energy_from_posted_offer(387 energy=offered_energy, time_slot=current_time_slot)388 assert storage_state.offered_sell_kWh[current_time_slot] == 2 * offered_energy389 def test_reset_offered_sell_energy_negative_energy_raise_error(self):390 storage_state, current_time_slot = self._setup_registration_test()391 self._assert_negative_energy_raise_error(392 storage_state.reset_offered_sell_energy, time_slot=current_time_slot)393 def test_reset_offered_sell_energy_energy_in_offered_sell_dict(self):394 storage_state, current_time_slot = self._setup_registration_test()395 offered_sell_energy = 1396 storage_state.reset_offered_sell_energy(397 energy=offered_sell_energy, time_slot=current_time_slot)398 assert storage_state.offered_sell_kWh[current_time_slot] == offered_sell_energy399 def test_reset_offered_buy_energy_negative_energy_raise_error(self):400 storage_state, current_time_slot = self._setup_registration_test()401 self._assert_negative_energy_raise_error(402 storage_state.reset_offered_buy_energy, time_slot=current_time_slot)403 def test_reset_offered_buy_energy_energy_in_offered_buy_dict(self):404 storage_state, current_time_slot = self._setup_registration_test()405 offered_buy_energy = 1406 storage_state.reset_offered_buy_energy(407 energy=offered_buy_energy, time_slot=current_time_slot)408 assert storage_state.offered_buy_kWh[current_time_slot] == offered_buy_energy409 def test_remove_energy_from_deleted_offer_negative_energy_raise_error(self):410 storage_state, current_time_slot = self._setup_registration_test()411 self._assert_negative_energy_raise_error(412 storage_state.remove_energy_from_deleted_offer, time_slot=current_time_slot)413 def test_remove_energy_from_deleted_offer_energy_in_offered_buy_dict(self):414 storage_state, current_time_slot = self._setup_registration_test()415 offered_energy = 1416 storage_state.offered_sell_kWh[current_time_slot] = offered_energy417 storage_state.remove_energy_from_deleted_offer(418 energy=offered_energy, time_slot=current_time_slot)419 assert storage_state.offered_buy_kWh[current_time_slot] == 0420 def test_register_energy_from_one_sided_market_accept_offer_negative_energy_raise_error(self):421 storage_state, current_time_slot = self._setup_registration_test()422 self._assert_negative_energy_raise_error(423 storage_state.register_energy_from_one_sided_market_accept_offer,424 time_slot=current_time_slot)425 def test_register_energy_from_one_sided_market_accept_offer_energy_register_in_dict(self):426 storage_state, current_time_slot = self._setup_registration_test()427 energy = 1428 storage_state.register_energy_from_one_sided_market_accept_offer(429 energy=energy, time_slot=current_time_slot)430 storage_state.register_energy_from_one_sided_market_accept_offer(431 energy=energy, time_slot=current_time_slot)432 assert storage_state.pledged_buy_kWh[current_time_slot] == 2 * energy433 def test_register_energy_from_bid_trade_negative_energy_raise_error(self):434 storage_state, current_time_slot = self._setup_registration_test()435 self._assert_negative_energy_raise_error(436 storage_state.register_energy_from_bid_trade, time_slot=current_time_slot)437 def test_register_energy_from_bid_trade_energy_register_in_dict(self):438 storage_state, current_time_slot = self._setup_registration_test()439 energy = 1440 storage_state.offered_buy_kWh[current_time_slot] = 2 * energy441 storage_state.register_energy_from_bid_trade(442 energy=energy, time_slot=current_time_slot)443 storage_state.register_energy_from_bid_trade(444 energy=energy, time_slot=current_time_slot)445 assert storage_state.pledged_buy_kWh[current_time_slot] == 2 * energy446 assert storage_state.offered_buy_kWh[current_time_slot] == 0447 def test_register_energy_from_offer_trade_negative_energy_raise_error(self):448 storage_state, current_time_slot = self._setup_registration_test()449 self._assert_negative_energy_raise_error(450 storage_state.register_energy_from_offer_trade, time_slot=current_time_slot)451 def test_register_energy_from_offer_trade_energy_register_in_dict(self):452 storage_state, current_time_slot = self._setup_registration_test()453 energy = 1454 storage_state.offered_sell_kWh[current_time_slot] = 2 * energy455 storage_state.register_energy_from_offer_trade(456 energy=energy, time_slot=current_time_slot)457 storage_state.register_energy_from_offer_trade(458 energy=energy, time_slot=current_time_slot)459 assert storage_state.pledged_sell_kWh[current_time_slot] == 2 * energy460 assert storage_state.offered_sell_kWh[current_time_slot] == 0461 def _setup_registration_test(self):462 storage_state = StorageState()463 current_time_slot, _, _ = self._initialize_time_slots()464 storage_state.add_default_values_to_state_profiles([current_time_slot])465 storage_state.activate(466 slot_length=duration(minutes=15), current_time_slot=current_time_slot)467 return storage_state, current_time_slot468 @staticmethod469 def _assert_negative_energy_raise_error(method, time_slot):470 energy = -1471 with pytest.raises(AssertionError):472 method(energy, time_slot)473 @staticmethod474 def _setup_storage_state_for_energy_origin_tracking():475 storage_state = StorageState()476 used_storage_share = [EnergyOrigin(ESSEnergyOrigin.LOCAL, 0.4),477 EnergyOrigin(ESSEnergyOrigin.EXTERNAL, 0.5),478 EnergyOrigin(ESSEnergyOrigin.UNKNOWN, 0.1)]479 storage_state._used_storage_share = used_storage_share480 return storage_state481 def test_track_energy_bought_type_append_new_energy_origin_respecting_origin(self):482 storage_state = self._setup_storage_state_for_energy_origin_tracking()483 initial_registry_number = len(storage_state._used_storage_share)484 energy = 1.0485 storage_state._track_energy_bought_type(energy=energy, energy_origin=ESSEnergyOrigin.LOCAL)486 assert len(storage_state._used_storage_share) == initial_registry_number + 1487 assert isinstance(storage_state._used_storage_share[-1], EnergyOrigin)488 assert storage_state._used_storage_share[-1].origin == ESSEnergyOrigin.LOCAL489 assert storage_state._used_storage_share[-1].value == energy490 def test_track_energy_sell_type_sell_all_energy(self):491 storage_state = self._setup_storage_state_for_energy_origin_tracking()492 available_energy_for_sell = 0493 for energy in storage_state._used_storage_share:494 available_energy_for_sell += energy.value495 storage_state._track_energy_sell_type(energy=available_energy_for_sell)496 if len(storage_state._used_storage_share) != 0:497 assert isclose(498 storage_state._used_storage_share[0].value, 0, abs_tol=FLOATING_POINT_TOLERANCE)499 else:500 assert len(storage_state._used_storage_share) == 0501 def test_track_energy_sell_type_sell_more_energy_than_available(self):502 storage_state = self._setup_storage_state_for_energy_origin_tracking()503 available_energy_for_sell = 0504 used_storage_share = storage_state._used_storage_share505 for energy in used_storage_share:506 available_energy_for_sell += energy.value507 storage_state._track_energy_sell_type(energy=available_energy_for_sell + 0.1)508 assert len(storage_state._used_storage_share) == 0509 def test_track_energy_sell_type_sell_only_first_entry_completely(self):510 storage_state = self._setup_storage_state_for_energy_origin_tracking()511 energy_for_sale = 0.4512 initial_registry_number = len(storage_state._used_storage_share)513 original_used_storage_share = storage_state._used_storage_share.copy()514 storage_state._track_energy_sell_type(energy=energy_for_sale)515 assert (len(storage_state._used_storage_share) ==516 initial_registry_number - 1)517 assert (storage_state._used_storage_share[0].origin ==518 original_used_storage_share[1].origin)519 assert isclose(520 storage_state._used_storage_share[0].value,521 original_used_storage_share[1].value,522 abs_tol=FLOATING_POINT_TOLERANCE)523 def test_get_soc_level_default_values_and_custom_values(self):524 storage_state = StorageState(initial_soc=100,525 capacity=100)526 current_time_slot, _, _ = self._initialize_time_slots()527 storage_state.add_default_values_to_state_profiles([current_time_slot])528 storage_state.activate(529 slot_length=duration(minutes=15), current_time_slot=current_time_slot)530 assert storage_state.get_soc_level(current_time_slot) == 1531 storage_state.charge_history[current_time_slot] = 50532 assert storage_state.get_soc_level(current_time_slot) == 0.5533 def test_to_dict_keys_in_return_dict(self):534 storage_state = StorageState()535 current_time_slot, _, _ = self._initialize_time_slots()536 storage_state.add_default_values_to_state_profiles([current_time_slot])537 storage_state.energy_to_sell_dict[current_time_slot] = "test_energy_to_sell"538 storage_state.offered_sell_kWh[current_time_slot] = "test_energy_active_in_offers"539 storage_state.energy_to_buy_dict[current_time_slot] = "test_energy_to_buy"540 storage_state.offered_buy_kWh[current_time_slot] = "test_energy_active_in_bids"541 free_storage_mock = MagicMock(return_value="test_free_storage")542 used_storage_mock = PropertyMock()543 storage_state.free_storage = free_storage_mock544 storage_state._used_storage = used_storage_mock545 assert set(SAMPLE_STATS.keys()).issubset(storage_state.to_dict(current_time_slot).keys())546 assert (storage_state.to_dict(current_time_slot)["energy_to_sell"] ==547 "test_energy_to_sell")548 assert (storage_state.to_dict(current_time_slot)["energy_active_in_bids"] ==549 "test_energy_active_in_bids")550 assert (storage_state.to_dict(current_time_slot)["energy_to_buy"] ==551 "test_energy_to_buy")552 assert (storage_state.to_dict(current_time_slot)["energy_active_in_offers"] ==553 "test_energy_active_in_offers")554 assert (storage_state.to_dict(current_time_slot)["free_storage"] ==555 "test_free_storage")556 assert (storage_state.to_dict(current_time_slot)["used_storage"] ==...
browser.py
Source:browser.py
...288 if err.message == "NS_BINDING_ABORTED":289 logger.error(f"Ignoring {err} exception")290 cookiefile = f"browser_cookie_{coachid}.json"291 logger.info(f"Store cookie state in {cookiefile}")292 storage_state = context.storage_state()293 with open(cookiefile, "w") as write_file:294 json.dump(storage_state, write_file)295 logger.debug("progress.emit(1)")296 progress.emit(1)297 else:298 logger.error(f"Exception during select text 'My Locker' section: {err.__class__}")299 logger.error(f"Exception = {err}")300 logger.debug("progress.emit(999999)")301 progress.emit(999999)302 return False303 else:304 logger.info("Found 'My Locker' so authentication was successful.")305 time.sleep(10)306 cookiefile = os.path.join(myconfig.cookies_directory_path, f"browser_cookie_{coachid}.json")307 logger.info(f"Store cookie state in {cookiefile}")308 storage_state = context.storage_state()309 with open(cookiefile, "w") as write_file:310 json.dump(storage_state, write_file)311 logger.debug("progress.emit(1)")312 progress.emit(1)313 return True314 finally:315 316 context.close()317 browser.close()318 logger.info("Playwright browser closed.")319 320 else:321 context = browser.new_context(storage_state=storage_state)322 page = context.new_page()...
remote.py
Source:remote.py
1# -.- coding: utf-8 -.-2# Zeitgeist3#4# Copyright © 2009-2010 Siegfried-Angel Gevatter Pujals <rainct@ubuntu.com>5# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@gmail.com>6# Copyright © 2010 Seif Lotfy <seif@lotfy.com>7# Copyright © 2011 Markus Korn <thekorn@gmx.de>8#9# This program is free software: you can redistribute it and/or modify10# it under the terms of the GNU Lesser General Public License as published by11# the Free Software Foundation, either version 2.1 of the License, or12# (at your option) any later version.13#14# This program is distributed in the hope that it will be useful,15# but WITHOUT ANY WARRANTY; without even the implied warranty of16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the17# GNU Lesser General Public License for more details.18#19# You should have received a copy of the GNU Lesser General Public License20# along with this program. If not, see <http://www.gnu.org/licenses/>.21import dbus22import dbus.service23import logging24from xml.etree import ElementTree25from zeitgeist.datamodel import TimeRange, StorageState, ResultType, NULL_EVENT26from _zeitgeist.engine.datamodel import Event, Subject27from _zeitgeist.engine import get_engine28from _zeitgeist.engine.notify import MonitorManager29from _zeitgeist.engine import constants30from _zeitgeist.singleton import SingletonApplication31class DBUSProperty(property):32 33 def __init__(self, fget=None, fset=None, in_signature=None, out_signature=None):34 assert not (fget and not out_signature), "fget needs a dbus signature"35 assert not (fset and not in_signature), "fset needs a dbus signature"36 assert (fget and not fset) or (fset and fget), \37 "dbus properties needs to be either readonly or readwritable"38 self.in_signature = in_signature39 self.out_signature = out_signature40 super(DBUSProperty, self).__init__(fget, fset)41class RemoteInterface(SingletonApplication):42 """43 Primary interface to the Zeitgeist engine. Used to update and query44 the log. It also provides means to listen for events matching certain45 criteria. All querying is heavily based around an46 "event template"-concept.47 48 The main log of the Zeitgeist engine has DBus object path49 :const:`/org/gnome/zeitgeist/log/activity` under the bus name50 :const:`org.gnome.zeitgeist.Engine`.51 """52 _dbus_properties = {53 "version": DBUSProperty(lambda self: (0, 7, 1), out_signature="iii"),54 "extensions": DBUSProperty(55 lambda self: list(self._engine.extensions.iter_names()),56 out_signature="as"),57 }58 59 # Initialization60 61 def __init__(self, start_dbus=True, mainloop=None):62 SingletonApplication.__init__(self)63 self._mainloop = mainloop64 self._engine = get_engine()65 self._notifications = MonitorManager()66 67 # Private methods68 69 def _make_events_sendable(self, events):70 for event in events:71 if event is not None:72 event._make_dbus_sendable()73 return [NULL_EVENT if event is None else event for event in events]74 75 # Reading stuff76 77 @dbus.service.method(constants.DBUS_INTERFACE,78 in_signature="au",79 out_signature="a("+constants.SIG_EVENT+")",80 sender_keyword="sender")81 def GetEvents(self, event_ids, sender):82 """Get full event data for a set of event IDs83 84 Each event which is not found in the event log is represented85 by the `NULL_EVENT` struct in the resulting array.86 87 :param event_ids: An array of event IDs. Fx. obtained by calling88 :meth:`FindEventIds`89 :type event_ids: Array of unsigned 32 bit integers.90 DBus signature au91 :returns: Full event data for all the requested IDs. The92 event data can be conveniently converted into a list of93 :class:`Event` instances by calling *events = map(Event.new_for_struct, result)*94 :rtype: A list of serialized events. DBus signature a(asaasay).95 """96 return self._make_events_sendable(self._engine.get_events(ids=event_ids,97 sender=sender))98 99 @dbus.service.method(constants.DBUS_INTERFACE,100 in_signature="(xx)a("+constants.SIG_EVENT+")a("+constants.SIG_EVENT+")uuu",101 out_signature="as")102 def FindRelatedUris(self, time_range, event_templates,103 result_event_templates, storage_state, num_events, result_type):104 """Warning: This API is EXPERIMENTAL and is not fully supported yet.105 106 Get a list of URIs of subjects which frequently occur together107 with events matching `event_templates` within `time_range`.108 The resulting URIs must occur as subjects of events matching109 `result_event_templates` and have storage state110 `storage_state`.111 112 :param time_range: two timestamps defining the timerange for113 the query. When using the Python bindings for Zeitgeist you114 may pass a :class:`TimeRange <zeitgeist.datamodel.TimeRange>`115 instance directly to this method.116 :type time_range: tuple of 64 bit integers,117 DBus signature :const:`(xx)`118 :param event_templates: An array of event templates119 which you want URIs that relate to.120 When using the Python bindings for Zeitgeist you may pass121 a list of :class:`Event <zeitgeist.datamodel.Event>`122 instances directly to this method.123 :type event_templates: array of events,124 DBus signature :const:`a(asaasay)`125 :param result_event_templates: An array of event templates which126 the returned URIs must occur as subjects of.127 When using the Python bindings for Zeitgeist you may pass128 a list of :class:`Event <zeitgeist.datamodel.Event>`129 instances directly to this method.130 :type result_event_templates: array of events,131 DBus signature :const:`a(asaasay)`132 :param storage_state: whether the item is currently known to be133 available. The list of possible values is enumerated in the134 :class:`StorageState <zeitgeist.datamodel.StorageState>` class135 :type storage_state: unsigned 32 bit integer, DBus signature :const:`u`136 :param num_events: maximal amount of returned events137 :type num_events: unsigned integer138 :param result_type: unsigned integer 0 for relevancy 1 for recency139 :type order: unsigned integer140 :returns: A list of URIs matching the described criteria141 :rtype: An array of strings, DBus signature :const:`as`.142 """143 event_templates = map(Event, event_templates)144 return self._engine.find_related_uris(time_range, event_templates,145 result_event_templates, storage_state, num_events, result_type)146 147 @dbus.service.method(constants.DBUS_INTERFACE,148 in_signature="(xx)a("+constants.SIG_EVENT+")uuu",149 out_signature="au")150 def FindEventIds(self, time_range, event_templates, storage_state,151 num_events, result_type):152 """Search for events matching a given set of templates and return153 the IDs of matching events.154 155 Use :meth:`GetEvents` passing in the returned IDs to look up156 the full event data.157 158 The matching is done where unset fields in the templates159 are treated as wildcards. If a template has more than one160 subject then events will match the template if any one of their161 subjects match any one of the subject templates.162 163 The fields uri, interpretation, manifestation, origin, and mimetype164 can be prepended with an exclamation mark '!' in order to negate165 the matching.166 167 The fields uri, origin, and mimetype can be prepended with an168 asterisk '*' in order to do truncated matching.169 170 This method is intended for queries potentially returning a171 large result set. It is especially useful in cases where only172 a portion of the results are to be displayed at the same time173 (eg., by using paging or dynamic scrollbars), as by holding a174 list of IDs you keep a stable ordering and you can ask for the175 details associated to them in batches, when you need them. For queries176 yielding a small amount of results, or where you need the information177 about all results at once no matter how many of them there are,178 see :meth:`FindEvents`.179 180 :param time_range: two timestamps defining the timerange for181 the query. When using the Python bindings for Zeitgeist you182 may pass a :class:`TimeRange <zeitgeist.datamodel.TimeRange>`183 instance directly to this method184 :type time_range: tuple of 64 bit integers. DBus signature (xx)185 :param event_templates: An array of event templates which the186 returned events should match at least one of.187 When using the Python bindings for Zeitgeist you may pass188 a list of :class:`Event <zeitgeist.datamodel.Event>`189 instances directly to this method.190 :type event_templates: array of events. DBus signature a(asaasay)191 :param storage_state: whether the item is currently known to be192 available. The list of possible values is enumerated in193 :class:`StorageState <zeitgeist.datamodel.StorageState>` class194 :type storage_state: unsigned integer195 :param num_events: maximal amount of returned events196 :type num_events: unsigned integer197 :param order: unsigned integer representing198 a :class:`result type <zeitgeist.datamodel.ResultType>`199 :type order: unsigned integer200 :returns: An array containing the IDs of all matching events,201 up to a maximum of *num_events* events. Sorted and grouped202 as defined by the *result_type* parameter.203 :rtype: Array of unsigned 32 bit integers204 """205 time_range = TimeRange(time_range[0], time_range[1])206 event_templates = map(Event, event_templates)207 return self._engine.find_eventids(time_range, event_templates, storage_state,208 num_events, result_type)209 @dbus.service.method(constants.DBUS_INTERFACE,210 in_signature="(xx)a("+constants.SIG_EVENT+")uuu",211 out_signature="a("+constants.SIG_EVENT+")",212 sender_keyword="sender")213 def FindEvents(self, time_range, event_templates, storage_state,214 num_events, result_type, sender):215 """Get events matching a given set of templates.216 217 The matching is done where unset fields in the templates218 are treated as wildcards. If a template has more than one219 subject then events will match the template if any one of their220 subjects match any one of the subject templates.221 222 The fields uri, interpretation, manifestation, origin, and mimetype223 can be prepended with an exclamation mark '!' in order to negate224 the matching.225 226 The fields uri, origin, and mimetype can be prepended with an227 asterisk '*' in order to do truncated matching.228 229 In case you need to do a query yielding a large (or unpredictable)230 result set and you only want to show some of the results at the231 same time (eg., by paging them), use :meth:`FindEventIds`.232 233 :param time_range: two timestamps defining the timerange for234 the query. When using the Python bindings for Zeitgeist you235 may pass a :class:`TimeRange <zeitgeist.datamodel.TimeRange>`236 instance directly to this method237 :type time_range: tuple of 64 bit integers. DBus signature (xx)238 :param event_templates: An array of event templates which the239 returned events should match at least one of.240 When using the Python bindings for Zeitgeist you may pass241 a list of :class:`Event <zeitgeist.datamodel.Event>`242 instances directly to this method.243 :type event_templates: array of events. DBus signature a(asaasay)244 :param storage_state: whether the item is currently known to be245 available. The list of possible values is enumerated in246 :class:`StorageState <zeitgeist.datamodel.StorageState>` class247 :type storage_state: unsigned integer248 :param num_events: maximal amount of returned events249 :type num_events: unsigned integer250 :param order: unsigned integer representing251 a :class:`result type <zeitgeist.datamodel.ResultType>`252 :type order: unsigned integer253 :returns: Full event data for all the requested IDs, up to a maximum254 of *num_events* events, sorted and grouped as defined by the255 *result_type* parameter. The event data can be conveniently256 converted into a list of :class:`Event` instances by calling257 *events = map(Event.new_for_struct, result)*258 :rtype: A list of serialized events. DBus signature a(asaasay).259 """260 time_range = TimeRange(time_range[0], time_range[1])261 event_templates = map(Event, event_templates)262 return self._make_events_sendable(self._engine.find_events(time_range,263 event_templates, storage_state, num_events, result_type, sender))264 # Writing stuff265 266 @dbus.service.method(constants.DBUS_INTERFACE,267 in_signature="a("+constants.SIG_EVENT+")",268 out_signature="au",269 sender_keyword="sender")270 def InsertEvents(self, events, sender):271 """Inserts events into the log. Returns an array containing the IDs272 of the inserted events273 274 Each event which failed to be inserted into the log (either by275 being blocked or because of an error) will be represented by `0`276 in the resulting array.277 278 One way events may end up being blocked is if they match any279 of the :ref:`blacklist templates <org_gnome_zeitgeist_Blacklist>`.280 281 Any monitors with matching templates will get notified about282 the insertion. Note that the monitors are notified *after* the283 events have been inserted.284 285 :param events: List of events to be inserted in the log.286 If you are using the Python bindings you may pass287 :class:`Event <zeitgeist.datamodel.Event>` instances288 directly to this method289 :returns: An array containing the event IDs of the inserted290 events. In case any of the events where already logged,291 the ID of the existing event will be returned. `0` as ID292 indicates a failed insert into the log.293 :rtype: Array of unsigned 32 bits integers. DBus signature au.294 """295 if not events : return []296 events = map(Event, events)297 event_ids = self._engine.insert_events(events, sender)298 299 _events = []300 min_stamp = events[0].timestamp301 max_stamp = min_stamp302 for ev, ev_id in zip(events, event_ids):303 if not ev_id:304 # event has not been inserted because of an error or 305 # because of being blocked by an extension306 # this is why we do not notify clients about this event307 continue308 _ev = Event(ev)309 _ev[0][Event.Id] = ev_id310 _events.append(_ev)311 min_stamp = min(min_stamp, _ev.timestamp)312 max_stamp = max(max_stamp, _ev.timestamp)313 self._notifications.notify_insert(TimeRange(min_stamp, max_stamp), _events)314 315 return event_ids316 317 @dbus.service.method(constants.DBUS_INTERFACE,318 in_signature="au",319 out_signature="(xx)",320 sender_keyword="sender")321 def DeleteEvents(self, event_ids, sender):322 """Delete a set of events from the log given their IDs323 324 :param event_ids: list of event IDs obtained, for example, by calling325 :meth:`FindEventIds`326 :type event_ids: list of integers327 """328 timestamps = self._engine.delete_events(event_ids, sender=sender)329 if timestamps:330 # We need to check the return value, as the events could already331 # have been deleted before or the IDs might even have been invalid.332 self._notifications.notify_delete(333 TimeRange(timestamps[0], timestamps[1]), event_ids)334 if timestamps is None:335 # unknown event id, see doc of delete_events()336 return (-1, -1)337 timestamp_start, timestamp_end = timestamps338 timestamp_start = timestamp_start if timestamp_start is not None else -1339 timestamp_end = timestamp_end if timestamp_end is not None else -1340 return (timestamp_start, timestamp_end)341 @dbus.service.method(constants.DBUS_INTERFACE, in_signature="", out_signature="")342 def DeleteLog(self):343 """Delete the log file and all its content344 345 This method is used to delete the entire log file and all its346 content in one go. To delete specific subsets use347 :meth:`FindEventIds` combined with :meth:`DeleteEvents`.348 """349 self._engine.delete_log()350 351 @dbus.service.method(constants.DBUS_INTERFACE)352 def Quit(self):353 """Terminate the running Zeitgeist engine process; use with caution,354 this action must only be triggered with the user's explicit consent,355 as it will affect all applications using Zeitgeist"""356 self._engine.close()357 if self._mainloop:358 self._mainloop.quit()359 # remove the interface from all busses (in our case from the session bus)360 self.remove_from_connection()361 362 # Properties interface363 @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,364 in_signature="ss", out_signature="v")365 def Get(self, interface_name, property_name):366 if interface_name != constants.DBUS_INTERFACE:367 raise ValueError(368 "'%s' doesn't know anything about the '%s' interface" \369 %(constants.DBUS_INTERFACE, interface_name)370 )371 try:372 return self._dbus_properties[property_name].fget(self)373 except KeyError, e:374 raise AttributeError(property_name)375 @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,376 in_signature="ssv", out_signature="")377 def Set(self, interface_name, property_name, value):378 if interface_name != constants.DBUS_INTERFACE:379 raise ValueError(380 "'%s' doesn't know anything about the '%s' interface" \381 %(constants.DBUS_INTERFACE, interface_name)382 )383 try:384 prop = self._dbus_properties[property_name].fset(self, value)385 except (KeyError, TypeError), e:386 raise AttributeError(property_name)387 @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,388 in_signature="s", out_signature="a{sv}")389 def GetAll(self, interface_name):390 if interface_name != constants.DBUS_INTERFACE:391 raise ValueError(392 "'%s' doesn't know anything about the '%s' interface" \393 %(constants.DBUS_INTERFACE, interface_name)394 )395 return dict((k, v.fget(self)) for (k,v) in self._dbus_properties.items())396 397 # Instrospection Interface398 399 @dbus.service.method(dbus.INTROSPECTABLE_IFACE, in_signature="", out_signature="s",400 path_keyword="object_path", connection_keyword="connection")401 def Introspect(self, object_path, connection):402 data = dbus.service.Object.Introspect(self, object_path, connection)403 xml = ElementTree.fromstring(data)404 for iface in xml.findall("interface"):405 if iface.attrib["name"] != constants.DBUS_INTERFACE:406 continue407 for prop_name, prop_func in self._dbus_properties.iteritems():408 prop = {"name": prop_name}409 if prop_func.fset is not None:410 prop["access"] = "readwrite"411 else:412 prop["access"] = "read"413 prop["type"] = prop_func.out_signature414 iface.append(ElementTree.Element("property", prop))415 return ElementTree.tostring(xml, encoding="UTF-8")416 417 # Notifications interface418 419 @dbus.service.method(constants.DBUS_INTERFACE,420 in_signature="o(xx)a("+constants.SIG_EVENT+")", sender_keyword="owner")421 def InstallMonitor(self, monitor_path, time_range, event_templates, owner=None):422 """Register a client side monitor object to receive callbacks when423 events matching *time_range* and *event_templates* are inserted or424 deleted.425 426 The monitor object must implement the interface :ref:`org.gnome.zeitgeist.Monitor <org_gnome_zeitgeist_Monitor>`427 428 The monitor templates are matched exactly like described in429 :meth:`FindEventIds`.430 431 :param monitor_path: DBus object path to the client side monitor object. DBus signature o.432 :param time_range: A two-tuple with the time range monitored433 events must fall within. Recall that time stamps are in434 milliseconds since the Epoch. DBus signature (xx)435 :param event_templates: Event templates that events must match436 in order to trigger the monitor. Just like :meth:`FindEventIds`.437 DBus signature a(asaasay)438 """439 event_templates = map(Event, event_templates)440 time_range = TimeRange(time_range[0], time_range[1])441 self._notifications.install_monitor(owner, monitor_path, time_range, event_templates)442 443 @dbus.service.method(constants.DBUS_INTERFACE,444 in_signature="o", sender_keyword="owner")445 def RemoveMonitor(self, monitor_path, owner=None):446 """Remove a monitor installed with :meth:`InstallMonitor`447 448 :param monitor_path: DBus object path of monitor to remove as449 supplied to :meth:`InstallMonitor`.450 """...
api_run.py
Source:api_run.py
...128 :type name: str129 """130 self._name = name131 @property132 def storage_state(self):133 """Gets the storage_state of this ApiRun. # noqa: E501134 :return: The storage_state of this ApiRun. # noqa: E501135 :rtype: ApiRunStorageState136 """137 return self._storage_state138 @storage_state.setter139 def storage_state(self, storage_state):140 """Sets the storage_state of this ApiRun.141 :param storage_state: The storage_state of this ApiRun. # noqa: E501142 :type storage_state: ApiRunStorageState143 """144 self._storage_state = storage_state145 @property146 def description(self):147 """Gets the description of this ApiRun. # noqa: E501148 :return: The description of this ApiRun. # noqa: E501149 :rtype: str150 """151 return self._description152 @description.setter153 def description(self, description):...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!