Best Python code snippet using localstack_python
workflow_test.py
Source:workflow_test.py
...228 workflow_client,229 mocked_boto_client,230 oldest_start_date,231 ):232 response = workflow_client.count_closed_workflow_executions(oldest_start_date=oldest_start_date)233 assert response.count == 321234 assert response.truncated235 mocked_boto_client.count_closed_workflow_executions.assert_called_with(236 domain=workflow_config.domain,237 startTimeFilter=dict(oldestDate=oldest_start_date)238 )239 def test_count_closed_workflow_executions_with_start_time_range(240 self,241 workflow_config,242 workflow_client,243 mocked_boto_client,244 oldest_start_date,245 latest_start_date,246 ):247 workflow_client.count_closed_workflow_executions(oldest_start_date=oldest_start_date, latest_start_date=latest_start_date)248 mocked_boto_client.count_closed_workflow_executions.assert_called_with(249 domain=workflow_config.domain,250 startTimeFilter=dict(251 oldestDate=oldest_start_date,252 latestDate=latest_start_date,253 )254 )255 def test_count_closed_workflow_executions_with_oldest_close_time(256 self,257 workflow_config,258 workflow_client,259 mocked_boto_client,260 oldest_close_date,261 ):262 workflow_client.count_closed_workflow_executions(oldest_close_date=oldest_close_date)263 mocked_boto_client.count_closed_workflow_executions.assert_called_with(264 domain=workflow_config.domain,265 closeTimeFilter=dict(oldestDate=oldest_close_date),266 )267 def test_count_closed_workflow_executions_with_close_time_range(268 self,269 workflow_config,270 workflow_client,271 mocked_boto_client,272 oldest_close_date,273 latest_close_date,274 ):275 workflow_client.count_closed_workflow_executions(oldest_close_date=oldest_close_date, latest_close_date=latest_close_date)276 mocked_boto_client.count_closed_workflow_executions.assert_called_with(277 domain=workflow_config.domain,278 closeTimeFilter=dict(oldestDate=oldest_close_date, latestDate=latest_close_date),279 )280 def test_count_closed_workflow_executions_with_oldest_start_time_and_workflow_name(281 self,282 workflow_config,283 workflow_client,284 mocked_boto_client,285 oldest_start_date,286 ):287 workflow_client.count_closed_workflow_executions(288 oldest_start_date=oldest_start_date,289 workflow_name='test',290 version='test',291 )292 mocked_boto_client.count_closed_workflow_executions.assert_called_with(293 domain=workflow_config.domain,294 startTimeFilter=dict(oldestDate=oldest_start_date),295 typeFilter=dict(296 name='test',297 version='test',298 ),299 )300 def test_count_closed_workflow_executions_with_oldest_close_time_and_workflow_name(301 self,302 workflow_config,303 workflow_client,304 mocked_boto_client,305 oldest_close_date,306 ):307 workflow_client.count_closed_workflow_executions(308 oldest_close_date=oldest_close_date,309 workflow_name='test',310 version='test',311 )312 mocked_boto_client.count_closed_workflow_executions.assert_called_with(313 domain=workflow_config.domain,314 closeTimeFilter=dict(oldestDate=oldest_close_date),315 typeFilter=dict(316 name='test',317 version='test',318 ),319 )320 def test_count_closed_workflow_executions_with_oldest_start_time_and_tag(321 self,322 workflow_config,323 workflow_client,324 mocked_boto_client,325 oldest_start_date,326 ):327 workflow_client.count_closed_workflow_executions(328 oldest_start_date=oldest_start_date,329 tag='test',330 )331 mocked_boto_client.count_closed_workflow_executions.assert_called_with(332 domain=workflow_config.domain,333 startTimeFilter=dict(oldestDate=oldest_start_date),334 tagFilter=dict(tag='test'),335 )336 def test_count_closed_workflow_executions_with_oldest_close_time_and_tag(337 self,338 workflow_config,339 workflow_client,340 mocked_boto_client,341 oldest_close_date,342 ):343 workflow_client.count_closed_workflow_executions(344 oldest_close_date=oldest_close_date,345 tag='test',346 )347 mocked_boto_client.count_closed_workflow_executions.assert_called_with(348 domain=workflow_config.domain,349 closeTimeFilter=dict(oldestDate=oldest_close_date),350 tagFilter=dict(tag='test'),351 )352 def test_count_closed_workflow_executions_with_oldest_start_time_and_workflow_id(353 self,354 workflow_config,355 workflow_client,356 mocked_boto_client,357 oldest_start_date,358 ):359 workflow_client.count_closed_workflow_executions(360 oldest_start_date=oldest_start_date,361 workflow_id='test',362 )363 mocked_boto_client.count_closed_workflow_executions.assert_called_with(364 domain=workflow_config.domain,365 startTimeFilter=dict(oldestDate=oldest_start_date),366 executionFilter=dict(workflowId='test'),367 )368 def test_count_closed_workflow_executions_with_oldest_close_time_and_workflow_id(369 self,370 workflow_config,371 workflow_client,372 mocked_boto_client,373 oldest_close_date,374 ):375 workflow_client.count_closed_workflow_executions(376 oldest_close_date=oldest_close_date,377 workflow_id='test',378 )379 mocked_boto_client.count_closed_workflow_executions.assert_called_with(380 domain=workflow_config.domain,381 closeTimeFilter=dict(oldestDate=oldest_close_date),382 executionFilter=dict(workflowId='test'),383 )384 def test_count_closed_workflow_executions_with_oldest_start_time_and_close_status(385 self,386 workflow_config,387 workflow_client,388 mocked_boto_client,389 oldest_start_date,390 ):391 workflow_client.count_closed_workflow_executions(392 oldest_start_date=oldest_start_date,393 close_status='test',394 )395 mocked_boto_client.count_closed_workflow_executions.assert_called_with(396 domain=workflow_config.domain,397 startTimeFilter=dict(oldestDate=oldest_start_date),398 closeStatusFilter=dict(status='test'),399 )400 def test_count_closed_workflow_executions_with_oldest_close_time_and_close_status(401 self,402 workflow_config,403 workflow_client,404 mocked_boto_client,405 oldest_close_date,406 ):407 workflow_client.count_closed_workflow_executions(408 oldest_close_date=oldest_close_date,409 close_status='test',410 )411 mocked_boto_client.count_closed_workflow_executions.assert_called_with(412 domain=workflow_config.domain,413 closeTimeFilter=dict(oldestDate=oldest_close_date),414 closeStatusFilter=dict(status='test'),...
workflow.py
Source:workflow.py
...112 startTimeFilter=start_time_filter_dict['startTimeFilter'],113 **workflow_filter_dict114 )115 return CountWorkflowsResult(count=response['count'], truncated=response['truncated'])116 def count_closed_workflow_executions(117 self,118 oldest_start_date=None,119 latest_start_date=None,120 oldest_close_date=None,121 latest_close_date=None,122 workflow_name=None,123 version=None,124 tag=None,125 workflow_id=None,126 close_status=None,127 ):128 """129 Count the number of closed workflows for a domain. You can pass in filtering criteria.130 This operation is eventually consistent. The results are best effort and may not exactly reflect recent updates131 and changes. Worklfows started or closed near the time when calling count_open_workflow_executions may not be reflected132 Passthrough to :meth:~SWF.Client.count_closed_workflow_executions``133 startTimeFilter and closeTimeFilter are mutually exclusive. You MUST specify one of these in a request but not both.134 closeStatusFilter , executionFilter , typeFilter and tagFilter are mutually exclusive. You can specify at most135 one of these in a request.136 :param oldest_start_date: datetime. Specifies the oldest start date and time to return.137 :param latest_start_date: datetime. Specifies the latest start date and time to return.138 :param oldest_close_date: datetime. Specifies the oldest close date and time to return.139 :param latest_close_date: datetime. Specifies the latest close date and time to return.140 :param workflow_name: string. Required for typeFilter. Specifies the type of the workflow executions to be counted.141 :param version: string. Optional for typeFilter. Version of the workflow type.142 :param tag: string. Required for tagFilter. Specifies the tag that must be associated with the execution for it143 to meet the filter criteria.144 :param workflow_id: string. Required for executionFilter. The workflowId to pass of match the criteria of this filter.145 :param close_status: string.146 valid status are ('COMPLETED', 'FAILED', 'CANCELED', 'TERMINATED', 'CONTINUED_AS_NEW', 'TIMED_OUT')147 This filter has an affect only if executionStatus is specified as CLOSED148 :return: total number of closed workflows that meet the filter criteria149 """150 time_filter_dict = _build_time_filter_dict(151 oldest_start_date=oldest_start_date,152 latest_start_date=latest_start_date,153 oldest_close_date=oldest_close_date,154 latest_close_date=latest_close_date,155 )156 workflow_filter_dict = _build_workflow_filter_dict(157 workflow_name=workflow_name,158 version=version,159 tag=tag,160 workflow_id=workflow_id,161 close_status=close_status,162 )163 workflow_filter_dict.update(time_filter_dict)164 response = self.boto_client.count_closed_workflow_executions(165 domain=self.workflow_client_config.domain,166 **workflow_filter_dict167 )168 return CountWorkflowsResult(count=response['count'], truncated=response['truncated'])169def _build_time_filter_dict(oldest_start_date=None, latest_start_date=None, oldest_close_date=None, latest_close_date=None):170 """171 Build time_filter_dict for calls to _count_closed_workflow_executions and _count_open_workflow_executions.172 Return result is a dict: {time_filter_type : filter_dict}173 filter_dict must contains key 'oldestDate'.174 sample input:175 oldest_start_date=datetime(2016, 11, 11)176 return example:177 {178 'startTimeFilter': {...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!