Best Python code snippet using localstack_python
clean_owners.py
Source:clean_owners.py
1from collections import defaultdict2from copy import copy3from functools import partial4from datetime import datetime5from mobile_endpoint.case.models import CommCareCase6from mobile_endpoint.models import OwnershipCleanlinessFlag7from mobile_endpoint.restore.cleanliness import get_case_footprint_info8from mobile_endpoint.restore.data_providers.case.load_testing import append_update_to_response9from mobile_endpoint.restore.data_providers.case.utils import get_case_sync_updates, CaseStub10from mobile_endpoint.synclog.models import SimplifiedSyncLog, LOG_FORMAT_SIMPLIFIED, IndexTree11def get_owner_id(case):12 return case.owner_id if case.owner_id is not None else case.user_id13def get_case_payload(restore_state):14 sync_op = CleanOwnerCaseSyncOperation(restore_state)15 return sync_op.get_payload()16chunk_size = 100017class CleanOwnerCaseSyncOperation(object):18 def __init__(self, restore_state):19 self.restore_state = restore_state20 self.dao = restore_state.dao21 self._cleanliness_flags = None22 @property23 def cleanliness_flags(self):24 if self._cleanliness_flags is None:25 self._cleanliness_flags = dict(26 OwnershipCleanlinessFlag.query.with_entities(27 OwnershipCleanlinessFlag.owner_id,28 OwnershipCleanlinessFlag.is_clean29 ).filter(30 OwnershipCleanlinessFlag.domain == self.restore_state.domain,31 OwnershipCleanlinessFlag.owner_id.in_(self.restore_state.owner_ids),32 )33 )34 return self._cleanliness_flags35 def is_clean(self, owner_id):36 return self.cleanliness_flags.get(owner_id, False)37 def get_payload(self):38 response = self.restore_state.restore_class()39 case_ids_to_sync = set()40 for owner_id in self.restore_state.owner_ids:41 case_ids_to_sync = case_ids_to_sync | set(self.get_case_ids_for_owner(owner_id))42 if (not self.restore_state.is_initial and43 any([not self.is_clean(owner_id) for owner_id in self.restore_state.owner_ids])):44 # if it's a steady state sync and we have any dirty owners, then we also need to45 # include ALL cases on the phone that have been modified since the last sync as46 # possible candidates to sync (since they may have been closed or reassigned by someone else)47 # don't bother checking ones we've already decided to check48 other_ids_to_check = self.restore_state.last_sync_log.case_ids_on_phone - case_ids_to_sync49 case_ids_to_sync = case_ids_to_sync | set(filter_cases_modified_since(50 self.dao, self.restore_state.domain, list(other_ids_to_check), self.restore_state.last_sync_log.date51 ))52 all_syncing = copy(case_ids_to_sync)53 all_indices = defaultdict(set)54 all_dependencies_syncing = set()55 while case_ids_to_sync:56 ids = pop_ids(case_ids_to_sync, chunk_size)57 case_batch = filter(58 partial(case_needs_to_sync, last_sync_log=self.restore_state.last_sync_log),59 self.dao.get_cases(ids)60 )61 updates = get_case_sync_updates(62 self.restore_state.domain, case_batch, self.restore_state.last_sync_log63 )64 for update in updates:65 case = update.case66 append_update_to_response(response, update, self.restore_state)67 # update the indices in the new sync log68 if case.indices:69 all_indices[case.id] = {index.identifier: index.referenced_id for index in case.indices}70 # and double check footprint for non-live cases71 for index in case.indices:72 if index.referenced_id not in all_syncing:73 case_ids_to_sync.add(index.referenced_id)74 if not _is_live(case, self.restore_state):75 all_dependencies_syncing.add(case.id)76 # commtrack ledger sections for this batch77 # TODO prototype: support stock78 # commtrack_elements = get_stock_payload(79 # self.restore_state.project, self.restore_state.stock_settings,80 # [CaseStub(update.case.id, update.case.type) for update in updates]81 # )82 # response.extend(commtrack_elements)83 # add any new values to all_syncing84 all_syncing = all_syncing | case_ids_to_sync85 # update sync token - marking it as the new format86 self.restore_state.current_sync_log = SimplifiedSyncLog.wrap(87 self.restore_state.current_sync_log.to_json()88 )89 self.restore_state.current_sync_log.log_format = LOG_FORMAT_SIMPLIFIED90 index_tree = IndexTree(indices=all_indices)91 case_ids_on_phone = all_syncing92 primary_cases_syncing = all_syncing - all_dependencies_syncing93 if not self.restore_state.is_initial:94 case_ids_on_phone = case_ids_on_phone | self.restore_state.last_sync_log.case_ids_on_phone95 # subtract primary cases from dependencies since they must be newly primary96 all_dependencies_syncing = all_dependencies_syncing | (97 self.restore_state.last_sync_log.dependent_case_ids_on_phone -98 primary_cases_syncing99 )100 index_tree = self.restore_state.last_sync_log.index_tree.apply_updates(index_tree)101 self.restore_state.current_sync_log.case_ids_on_phone = case_ids_on_phone102 self.restore_state.current_sync_log.dependent_case_ids_on_phone = all_dependencies_syncing103 self.restore_state.current_sync_log.index_tree = index_tree104 return response105 def get_case_ids_for_owner(self, owner_id):106 if self.is_clean(owner_id):107 if self.restore_state.is_initial:108 # for a clean owner's initial sync the base set is just the open ids109 return set(self.dao.get_open_case_ids(self.restore_state.domain, owner_id))110 else:111 # for a clean owner's steady state sync, the base set is anything modified since last sync112 return set(self.dao.get_case_ids_modified_with_owner_since(113 self.restore_state.domain, owner_id, self.restore_state.last_sync_log.date114 ))115 else:116 # right now just return the whole footprint and do any filtering later117 return get_case_footprint_info(self.dao, self.restore_state.domain, owner_id).all_ids118def _is_live(case, restore_state):119 """120 Given a case and a restore state object, return whether or not the case is "live"121 (direclty owned by this sync and open), or "dependent" (needed by another case)122 """123 return not case.closed and get_owner_id(case) in restore_state.owner_ids124def filter_cases_modified_since(dao, domain, case_ids, reference_date):125 """126 Given a domain, case_ids, and a reference date, filter the case ids to only those127 that have been modified since that reference date.128 """129 last_modified_date_dict = dao.get_last_modified_dates(domain, case_ids)130 for case_id in case_ids:131 if last_modified_date_dict.get(case_id, datetime(1900, 1, 1)) > reference_date:132 yield case_id133def case_needs_to_sync(case, last_sync_log):134 owner_id = case.owner_id or case.user_id # need to fallback to user_id for v1 cases135 if not last_sync_log or owner_id not in last_sync_log.owner_ids_on_phone:136 # initial sync or new owner IDs always sync down everything137 return True138 elif case.server_modified_on >= last_sync_log.date:139 # check all of the actions since last sync for one that had a different sync token140 return any(filter(141 lambda action: action.server_date > last_sync_log.date and action.sync_log_id != last_sync_log.id,142 case.actions,143 ))144 # if the case wasn't touched since last sync, and the phone was aware of this owner_id last time145 # don't worry about it146 return False147def pop_ids(set_, how_many):148 result = []149 for i in range(how_many):150 try:151 result.append(set_.pop())152 except KeyError:153 pass...
test_restore_state.py
Source:test_restore_state.py
1"""The tests for the Restore component."""2import asyncio3from datetime import timedelta4from unittest.mock import patch, MagicMock5from homeassistant.setup import setup_component6from homeassistant.const import EVENT_HOMEASSISTANT_START7from homeassistant.core import CoreState, split_entity_id, State8import homeassistant.util.dt as dt_util9from homeassistant.components import input_boolean, recorder10from homeassistant.helpers.restore_state import (11 async_get_last_state, DATA_RESTORE_CACHE)12from homeassistant.components.recorder.models import RecorderRuns, States13from tests.common import (14 get_test_home_assistant, mock_coro, init_recorder_component,15 mock_component)16@asyncio.coroutine17def test_caching_data(hass):18 """Test that we cache data."""19 mock_component(hass, 'recorder')20 hass.state = CoreState.starting21 states = [22 State('input_boolean.b0', 'on'),23 State('input_boolean.b1', 'on'),24 State('input_boolean.b2', 'on'),25 ]26 with patch('homeassistant.helpers.restore_state.last_recorder_run',27 return_value=MagicMock(end=dt_util.utcnow())), \28 patch('homeassistant.helpers.restore_state.get_states',29 return_value=states), \30 patch('homeassistant.helpers.restore_state.wait_connection_ready',31 return_value=mock_coro(True)):32 state = yield from async_get_last_state(hass, 'input_boolean.b1')33 assert DATA_RESTORE_CACHE in hass.data34 assert hass.data[DATA_RESTORE_CACHE] == {st.entity_id: st for st in states}35 assert state is not None36 assert state.entity_id == 'input_boolean.b1'37 assert state.state == 'on'38 hass.bus.async_fire(EVENT_HOMEASSISTANT_START)39 yield from hass.async_block_till_done()40 assert DATA_RESTORE_CACHE not in hass.data41@asyncio.coroutine42def test_hass_running(hass):43 """Test that cache cannot be accessed while hass is running."""44 mock_component(hass, 'recorder')45 states = [46 State('input_boolean.b0', 'on'),47 State('input_boolean.b1', 'on'),48 State('input_boolean.b2', 'on'),49 ]50 with patch('homeassistant.helpers.restore_state.last_recorder_run',51 return_value=MagicMock(end=dt_util.utcnow())), \52 patch('homeassistant.helpers.restore_state.get_states',53 return_value=states), \54 patch('homeassistant.helpers.restore_state.wait_connection_ready',55 return_value=mock_coro(True)):56 state = yield from async_get_last_state(hass, 'input_boolean.b1')57 assert state is None58@asyncio.coroutine59def test_not_connected(hass):60 """Test that cache cannot be accessed if db connection times out."""61 mock_component(hass, 'recorder')62 hass.state = CoreState.starting63 states = [State('input_boolean.b1', 'on')]64 with patch('homeassistant.helpers.restore_state.last_recorder_run',65 return_value=MagicMock(end=dt_util.utcnow())), \66 patch('homeassistant.helpers.restore_state.get_states',67 return_value=states), \68 patch('homeassistant.helpers.restore_state.wait_connection_ready',69 return_value=mock_coro(False)):70 state = yield from async_get_last_state(hass, 'input_boolean.b1')71 assert state is None72@asyncio.coroutine73def test_no_last_run_found(hass):74 """Test that cache cannot be accessed if no last run found."""75 mock_component(hass, 'recorder')76 hass.state = CoreState.starting77 states = [State('input_boolean.b1', 'on')]78 with patch('homeassistant.helpers.restore_state.last_recorder_run',79 return_value=None), \80 patch('homeassistant.helpers.restore_state.get_states',81 return_value=states), \82 patch('homeassistant.helpers.restore_state.wait_connection_ready',83 return_value=mock_coro(True)):84 state = yield from async_get_last_state(hass, 'input_boolean.b1')85 assert state is None86@asyncio.coroutine87def test_cache_timeout(hass):88 """Test that cache timeout returns none."""89 mock_component(hass, 'recorder')90 hass.state = CoreState.starting91 states = [State('input_boolean.b1', 'on')]92 @asyncio.coroutine93 def timeout_coro():94 raise asyncio.TimeoutError()95 with patch('homeassistant.helpers.restore_state.last_recorder_run',96 return_value=MagicMock(end=dt_util.utcnow())), \97 patch('homeassistant.helpers.restore_state.get_states',98 return_value=states), \99 patch('homeassistant.helpers.restore_state.wait_connection_ready',100 return_value=timeout_coro()):101 state = yield from async_get_last_state(hass, 'input_boolean.b1')102 assert state is None103def _add_data_in_last_run(hass, entities):104 """Add test data in the last recorder_run."""105 # pylint: disable=protected-access106 t_now = dt_util.utcnow() - timedelta(minutes=10)107 t_min_1 = t_now - timedelta(minutes=20)108 t_min_2 = t_now - timedelta(minutes=30)109 with recorder.session_scope(hass=hass) as session:110 session.add(RecorderRuns(111 start=t_min_2,112 end=t_now,113 created=t_min_2114 ))115 for entity_id, state in entities.items():116 session.add(States(117 entity_id=entity_id,118 domain=split_entity_id(entity_id)[0],119 state=state,120 attributes='{}',121 last_changed=t_min_1,122 last_updated=t_min_1,123 created=t_min_1))124def test_filling_the_cache():125 """Test filling the cache from the DB."""126 test_entity_id1 = 'input_boolean.b1'127 test_entity_id2 = 'input_boolean.b2'128 hass = get_test_home_assistant()129 hass.state = CoreState.starting130 init_recorder_component(hass)131 _add_data_in_last_run(hass, {132 test_entity_id1: 'on',133 test_entity_id2: 'off',134 })135 hass.block_till_done()136 setup_component(hass, input_boolean.DOMAIN, {137 input_boolean.DOMAIN: {138 'b1': None,139 'b2': None,140 }})141 hass.start()142 state = hass.states.get('input_boolean.b1')143 assert state144 assert state.state == 'on'145 state = hass.states.get('input_boolean.b2')146 assert state147 assert state.state == 'off'...
persistent_widgets.py
Source:persistent_widgets.py
2class PersistentCheckBox(QCheckBox):3 def __init__(self, name, changed=None, *args, **kwargs):4 super().__init__(*args, **kwargs)5 self.name = name6 self.restore_state()7 if changed:8 self.stateChanged.connect(changed)9 self.stateChanged.connect(lambda: QSettings().setValue(self.name, self.checkState()))10 11 def restore_state(self):12 prev_state = QSettings().value(self.name, 0)13 if prev_state == int(Qt.Checked):14 self.setCheckState(Qt.Checked)15 elif prev_state == int(Qt.PartiallyChecked):16 self.setCheckState(Qt.PartiallyChecked)17class PersistentLineEdit(QLineEdit):18 def __init__(self, name, *args, default='', changed=None, **kwargs):19 super().__init__(*args, **kwargs)20 self.name = name21 self.default = default22 self.restore_state()23 if changed:24 self.textChanged.connect(changed)25 self.textChanged.connect(lambda: QSettings().setValue(self.name, self.text()))26 27 def restore_state(self):28 self.setText(str(QSettings().value(self.name, self.default)))29class PersistentTextEdit(QTextEdit):30 def __init__(self, name, *args, default='', changed=None, **kwargs):31 super().__init__(*args, **kwargs)32 self.name = name33 self.default = default34 self.restore_state()35 if changed:36 self.textChanged.connect(changed)37 self.textChanged.connect(lambda: QSettings().setValue(self.name, self.toPlainText()))38 39 def restore_state(self):40 self.setText(str(QSettings().value(self.name, self.default)))41class PersistentListWidget(QListWidget):42 def __init__(self, name, items=[], default=[], changed=None, *args, **kwargs):43 super().__init__(*args, **kwargs)44 self.name = name45 self.default_selection = default46 if items:47 self.addItems(items)48 self.restore_state()49 50 if changed:51 self.itemSelectionChanged.connect(changed)52 self.itemSelectionChanged.connect(lambda: QSettings().setValue(self.name, self.selected_items()))53 def selected_items(self):54 return [item.text() for item in self.selectedItems()]55 def restore_state(self):56 prev_items = QSettings().value(self.name, self.default_selection)57 if prev_items:58 for i in range(self.count()):59 if self.item(i).text() in prev_items:60 self.item(i).setSelected(True)61class PersistentTreeWidget(QTreeWidget):62 def __init__(self, name, items=[], index_column=0, default=[], changed=None, *args, **kwargs):63 super().__init__(*args, **kwargs)64 self.name = name65 self.default_selection = default66 self.index_column = index_column67 if items:68 self.addItems(items)69 self.restore_state()70 71 if changed:72 self.itemSelectionChanged.connect(changed)73 self.itemSelectionChanged.connect(lambda: QSettings().setValue(self.name, self.selected_items()))74 def selected_items(self):75 return [item.text(self.index_column) for item in self.selectedItems()]76 def restore_state(self):77 prev_items = QSettings().value(self.name, self.default_selection)78 if prev_items:79 for i in range(self.count()):80 if self.item(i).text(self.index_column) in prev_items:81 self.item(i).setSelected(True)82class PersistentComboBox(QComboBox):83 def __init__(self, name, items=[], changed=None, *args, **kwargs):84 super().__init__(*args, **kwargs)85 self.name = name86 if items:87 self.addItems(items)88 self.restore_state()89 if changed:90 self.currentTextChanged.connect(changed)91 self.currentTextChanged.connect(lambda: QSettings().setValue(self.name, self.currentIndex()))92 def restore_state(self):93 prev_index = QSettings().value(self.name, 0)94 if isinstance(prev_index, int):95 self.setCurrentIndex(prev_index)96class PersistentCheckableAction(QAction):97 def __init__(self, name, *args, **kwargs):98 super().__init__(*args, **kwargs)99 self.name = name100 self.setCheckable(True)101 self.restore_state()102 self.triggered.connect(lambda: QSettings().setValue(self.name, self.isChecked()))103 def restore_state(self):104 prev_state = QSettings().value(self.name, 0)105 if prev_state == 'true':106 self.setChecked(True)107 elif prev_state == 'false':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!