Best Assertj code snippet using org.assertj.core.api.Assumptions.testException
Source:OperatorCoordinatorSchedulerTest.java
1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.flink.runtime.operators.coordination;19import org.apache.flink.api.common.JobID;20import org.apache.flink.api.common.typeutils.TypeSerializer;21import org.apache.flink.core.fs.CloseableRegistry;22import org.apache.flink.core.testutils.CommonTestUtils;23import org.apache.flink.metrics.MetricGroup;24import org.apache.flink.runtime.OperatorIDPair;25import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;26import org.apache.flink.runtime.checkpoint.Checkpoints;27import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;28import org.apache.flink.runtime.checkpoint.OperatorState;29import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;30import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;31import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;32import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;33import org.apache.flink.runtime.execution.Environment;34import org.apache.flink.runtime.execution.ExecutionState;35import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;36import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;37import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;38import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;39import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;40import org.apache.flink.runtime.jobgraph.JobGraph;41import org.apache.flink.runtime.jobgraph.JobGraphBuilder;42import org.apache.flink.runtime.jobgraph.JobVertex;43import org.apache.flink.runtime.jobgraph.JobVertexID;44import org.apache.flink.runtime.jobgraph.OperatorID;45import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;46import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;47import org.apache.flink.runtime.messages.Acknowledge;48import org.apache.flink.runtime.query.TaskKvStateRegistry;49import org.apache.flink.runtime.scheduler.DefaultScheduler;50import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;51import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;52import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;53import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;54import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;55import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;56import org.apache.flink.runtime.state.KeyGroupRange;57import org.apache.flink.runtime.state.KeyedStateHandle;58import org.apache.flink.runtime.state.OperatorStateBackend;59import org.apache.flink.runtime.state.OperatorStateHandle;60import org.apache.flink.runtime.state.StateBackend;61import org.apache.flink.runtime.state.StreamStateHandle;62import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;63import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;64import org.apache.flink.runtime.state.ttl.TtlTimeProvider;65import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;66import org.apache.flink.runtime.testtasks.NoOpInvokable;67import org.apache.flink.testutils.TestingUtils;68import org.apache.flink.testutils.executor.TestExecutorResource;69import org.apache.flink.util.FlinkException;70import org.apache.flink.util.SerializedValue;71import org.apache.flink.util.TestLogger;72import org.apache.flink.util.concurrent.FutureUtils;73import org.junit.After;74import org.junit.ClassRule;75import org.junit.Ignore;76import org.junit.Test;77import javax.annotation.Nonnull;78import javax.annotation.Nullable;79import java.io.ByteArrayOutputStream;80import java.io.IOException;81import java.util.Collection;82import java.util.Collections;83import java.util.Optional;84import java.util.Random;85import java.util.concurrent.CompletableFuture;86import java.util.concurrent.ScheduledExecutorService;87import java.util.function.Consumer;88import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;89import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setExecutionToState;90import static org.assertj.core.api.Assertions.assertThat;91import static org.assertj.core.api.Assertions.assertThatThrownBy;92import static org.assertj.core.api.Assertions.fail;93/**94 * Tests for the integration of the {@link OperatorCoordinator} with the scheduler, to ensure the95 * relevant actions are leading to the right method invocations on the coordinator.96 */97public class OperatorCoordinatorSchedulerTest extends TestLogger {98 private final JobVertexID testVertexId = new JobVertexID();99 private final OperatorID testOperatorId = new OperatorID();100 @ClassRule101 public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE =102 TestingUtils.defaultExecutorResource();103 private final ManuallyTriggeredScheduledExecutorService executor =104 new ManuallyTriggeredScheduledExecutorService();105 private DefaultScheduler createdScheduler;106 @After107 public void shutdownScheduler() throws Exception {108 if (createdScheduler != null) {109 closeScheduler(createdScheduler);110 }111 }112 // ------------------------------------------------------------------------113 // tests for scheduling114 // ------------------------------------------------------------------------115 @Test116 public void testCoordinatorStartedWhenSchedulerStarts() throws Exception {117 final DefaultScheduler scheduler = createAndStartScheduler();118 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);119 assertThat(coordinator.isStarted()).isTrue();120 }121 @Test122 public void testCoordinatorDisposedWhenSchedulerStops() throws Exception {123 final DefaultScheduler scheduler = createAndStartScheduler();124 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);125 closeScheduler(scheduler);126 assertThat(coordinator.isClosed()).isTrue();127 }128 @Test129 public void testFailureToStartPropagatesExceptions() throws Exception {130 final OperatorCoordinator.Provider failingCoordinatorProvider =131 new TestingOperatorCoordinator.Provider(132 testOperatorId, CoordinatorThatFailsInStart::new);133 final DefaultScheduler scheduler = createScheduler(failingCoordinatorProvider);134 try {135 scheduler.startScheduling();136 fail("expected an exception");137 } catch (Exception ignored) {138 // expected139 }140 }141 @Test142 public void testFailureToStartClosesCoordinator() throws Exception {143 final OperatorCoordinator.Provider failingCoordinatorProvider =144 new TestingOperatorCoordinator.Provider(145 testOperatorId, CoordinatorThatFailsInStart::new);146 final DefaultScheduler scheduler = createScheduler(failingCoordinatorProvider);147 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);148 try {149 scheduler.startScheduling();150 } catch (Exception ignored) {151 }152 assertThat(coordinator.isClosed()).isTrue();153 }154 @Test155 public void deployingTaskFailureNotifiesCoordinator() throws Exception {156 final DefaultScheduler scheduler = createAndStartScheduler();157 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);158 failTask(scheduler, 1);159 assertThat(coordinator.getFailedTasks()).hasSize(1).containsExactly(1);160 }161 @Test162 public void runningTaskFailureNotifiesCoordinator() throws Exception {163 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();164 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);165 failTask(scheduler, 1);166 assertThat(coordinator.getFailedTasks()).hasSize(1).containsExactly(1);167 }168 @Test169 public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {170 final DefaultScheduler scheduler = createSchedulerWithAllRestartOnFailureAndDeployTasks();171 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);172 failTask(scheduler, 1);173 assertThat(coordinator.getFailedTasks()).hasSize(2).containsExactlyInAnyOrder(0, 1);174 }175 @Test176 public void taskRepeatedFailureNotifyCoordinator() throws Exception {177 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();178 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);179 failAndRestartTask(scheduler, 0);180 failAndRestartTask(scheduler, 0);181 assertThat(coordinator.getFailedTasks()).hasSize(2).containsExactly(0, 0);182 }183 @Test184 public void taskGatewayNotSetBeforeTasksRunning() throws Exception {185 final DefaultScheduler scheduler = createAndStartScheduler();186 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);187 final OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);188 assertThat(gateway).isNull();189 }190 @Test191 public void taskGatewayAvailableWhenTasksRunning() throws Exception {192 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();193 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);194 final OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);195 assertThat(gateway).isNotNull();196 }197 @Test198 public void taskTaskManagerFailuresAreReportedBack() throws Exception {199 final DefaultScheduler scheduler =200 createSchedulerAndDeployTasks(new FailingTaskExecutorOperatorEventGateway());201 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);202 final OperatorCoordinator.SubtaskGateway gateway = coordinator.getSubtaskGateway(0);203 final CompletableFuture<?> result = gateway.sendEvent(new TestOperatorEvent());204 executor.triggerAll(); // process event sending205 assertThatThrownBy(result::get).satisfies(anyCauseMatches(TestException.class));206 }207 // THESE TESTS BELOW SHOULD LEGITIMATELY WORK, BUT THE SCHEDULER ITSELF SEEMS TO NOT HANDLE208 // THIS SITUATION AT THE MOMENT209 // WE KEEP THESE TESTS HERE TO ENABLE THEM ONCE THE SCHEDULER'S CONTRACT SUPPORTS THEM210 @Ignore211 @Test212 public void deployingTaskCancellationNotifiesCoordinator() throws Exception {213 final DefaultScheduler scheduler = createAndStartScheduler();214 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);215 cancelTask(scheduler, 1);216 assertThat(coordinator.getFailedTasks()).hasSize(1).containsExactly(1);217 }218 @Ignore219 @Test220 public void runningTaskCancellationNotifiesCoordinator() throws Exception {221 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();222 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);223 cancelTask(scheduler, 0);224 assertThat(coordinator.getFailedTasks()).hasSize(1).containsExactly(0);225 }226 // ------------------------------------------------------------------------227 // tests for checkpointing228 // ------------------------------------------------------------------------229 @Test230 public void testTakeCheckpoint() throws Exception {231 final byte[] checkpointData = new byte[656];232 new Random().nextBytes(checkpointData);233 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();234 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);235 final CompletableFuture<CompletedCheckpoint> checkpointFuture =236 triggerCheckpoint(scheduler);237 coordinator.getLastTriggeredCheckpoint().complete(checkpointData);238 executor.triggerAll();239 OperatorEvent event =240 new AcknowledgeCheckpointEvent(coordinator.getLastTriggeredCheckpointId());241 OperatorCoordinatorHolder holder = getCoordinatorHolder(scheduler);242 for (int i = 0; i < holder.currentParallelism(); i++) {243 holder.handleEventFromOperator(i, 0, event);244 }245 acknowledgeCurrentCheckpoint(scheduler);246 final OperatorState state = checkpointFuture.get().getOperatorStates().get(testOperatorId);247 assertThat(getStateHandleContents(state.getCoordinatorState()))248 .containsExactly(checkpointData);249 }250 @Test251 public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {252 final OperatorCoordinator.Provider failingCoordinatorProvider =253 new TestingOperatorCoordinator.Provider(254 testOperatorId, CoordinatorThatFailsCheckpointing::new);255 final DefaultScheduler scheduler =256 createSchedulerAndDeployTasks(failingCoordinatorProvider);257 final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);258 assertThatThrownBy(checkpointFuture::get).satisfies(anyCauseMatches(TestException.class));259 }260 @Test261 public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {262 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();263 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);264 final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);265 final CompletableFuture<?> coordinatorStateFuture =266 coordinator.getLastTriggeredCheckpoint();267 coordinatorStateFuture.completeExceptionally(new TestException());268 waitForCompletionToPropagate(checkpointFuture);269 assertThatThrownBy(checkpointFuture::get).satisfies(anyCauseMatches(TestException.class));270 }271 @Test272 public void testSavepointRestoresCoordinator() throws Exception {273 final byte[] testCoordinatorState = new byte[123];274 new Random().nextBytes(testCoordinatorState);275 final DefaultScheduler scheduler =276 createSchedulerWithRestoredSavepoint(testCoordinatorState);277 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);278 final byte[] restoredState = coordinator.getLastRestoredCheckpointState();279 assertThat(restoredState).containsExactly(testCoordinatorState);280 }281 @Test282 public void testGlobalFailureResetsToCheckpoint() throws Exception {283 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();284 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);285 final byte[] coordinatorState = new byte[] {7, 11, 3, 5};286 takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);287 failGlobalAndRestart(scheduler, new TestException());288 assertThat(coordinator.getLastRestoredCheckpointState())289 .as("coordinator should have a restored checkpoint")290 .containsExactly(coordinatorState);291 }292 @Test293 public void testGlobalFailureBeforeCheckpointResetsToEmptyState() throws Exception {294 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();295 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);296 failGlobalAndRestart(scheduler, new TestException());297 assertThat(coordinator.getLastRestoredCheckpointState())298 .as("coordinator should have null restored state")299 .isEqualTo(TestingOperatorCoordinator.NULL_RESTORE_VALUE);300 assertThat(coordinator.getLastRestoredCheckpointId())301 .isEqualTo(OperatorCoordinator.NO_CHECKPOINT);302 }303 @Test304 public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {305 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();306 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);307 takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);308 failGlobalAndRestart(scheduler, new TestException());309 assertThat(coordinator.getRestoredTasks()).isEmpty();310 }311 @Test312 public void testLocalFailoverResetsTask() throws Exception {313 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();314 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);315 final long checkpointId = takeCompleteCheckpoint(scheduler, coordinator, new byte[0]);316 failAndRestartTask(scheduler, 1);317 assertThat(coordinator.getRestoredTasks()).hasSize(1);318 final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask =319 coordinator.getRestoredTasks().get(0);320 assertThat(restoredTask.subtaskIndex).isEqualTo(1);321 assertThat(restoredTask.checkpointId).isEqualTo(checkpointId);322 }323 @Test324 public void testLocalFailoverBeforeCheckpointResetsTask() throws Exception {325 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();326 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);327 failAndRestartTask(scheduler, 1);328 assertThat(coordinator.getRestoredTasks()).hasSize(1);329 final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask =330 coordinator.getRestoredTasks().get(0);331 assertThat(restoredTask.subtaskIndex).isEqualTo(1);332 assertThat(restoredTask.checkpointId).isEqualTo(OperatorCoordinator.NO_CHECKPOINT);333 }334 @Test335 public void testLocalFailoverDoesNotResetToCheckpoint() throws Exception {336 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();337 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);338 takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});339 failAndRestartTask(scheduler, 0);340 assertThat(coordinator.getLastRestoredCheckpointState())341 .as("coordinator should not have a restored checkpoint")342 .isNull();343 }344 @Test345 public void testConfirmCheckpointComplete() throws Exception {346 final DefaultScheduler scheduler = createSchedulerAndDeployTasks();347 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);348 final long checkpointId =349 takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});350 assertThat(coordinator.getLastCheckpointComplete())351 .as("coordinator should be notified of completed checkpoint")352 .isEqualTo(checkpointId);353 }354 // ------------------------------------------------------------------------355 // tests for failover notifications in a batch setup (no checkpoints)356 // ------------------------------------------------------------------------357 @Test358 public void testBatchGlobalFailureResetsToEmptyState() throws Exception {359 final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks();360 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);361 failGlobalAndRestart(scheduler, new TestException());362 assertThat(coordinator.getLastRestoredCheckpointState())363 .as("coordinator should have null restored state")364 .isEqualTo(TestingOperatorCoordinator.NULL_RESTORE_VALUE);365 assertThat(coordinator.getLastRestoredCheckpointId())366 .isEqualTo(OperatorCoordinator.NO_CHECKPOINT);367 }368 @Test369 public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {370 final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks();371 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);372 failGlobalAndRestart(scheduler, new TestException());373 assertThat(coordinator.getRestoredTasks()).isEmpty();374 }375 @Test376 public void testBatchLocalFailoverResetsTask() throws Exception {377 final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks();378 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);379 failAndRestartTask(scheduler, 1);380 assertThat(coordinator.getRestoredTasks()).hasSize(1);381 final TestingOperatorCoordinator.SubtaskAndCheckpoint restoredTask =382 coordinator.getRestoredTasks().get(0);383 assertThat(restoredTask.subtaskIndex).isEqualTo(1);384 assertThat(restoredTask.checkpointId).isEqualTo(OperatorCoordinator.NO_CHECKPOINT);385 }386 @Test387 public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws Exception {388 final DefaultScheduler scheduler = createSchedulerWithoutCheckpointingAndDeployTasks();389 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);390 failAndRestartTask(scheduler, 0);391 assertThat(coordinator.getLastRestoredCheckpointState())392 .as("coordinator should not have a restored checkpoint")393 .isNull();394 }395 // ------------------------------------------------------------------------396 // tests for REST request delivery397 // ------------------------------------------------------------------------398 @Test399 @SuppressWarnings("unchecked")400 public void testDeliveringClientRequestToRequestHandler() throws Exception {401 final OperatorCoordinator.Provider provider =402 new TestingCoordinationRequestHandler.Provider(testOperatorId);403 final DefaultScheduler scheduler = createScheduler(provider);404 final String payload = "testing payload";405 final TestingCoordinationRequestHandler.Request<String> request =406 new TestingCoordinationRequestHandler.Request<>(payload);407 final TestingCoordinationRequestHandler.Response<String> response =408 (TestingCoordinationRequestHandler.Response<String>)409 scheduler410 .deliverCoordinationRequestToCoordinator(testOperatorId, request)411 .get();412 assertThat(response.getPayload()).isEqualTo(payload);413 }414 @Test415 public void testDeliveringClientRequestToNonRequestHandler() throws Exception {416 final OperatorCoordinator.Provider provider =417 new TestingOperatorCoordinator.Provider(testOperatorId);418 final DefaultScheduler scheduler = createScheduler(provider);419 final String payload = "testing payload";420 final TestingCoordinationRequestHandler.Request<String> request =421 new TestingCoordinationRequestHandler.Request<>(payload);422 CommonTestUtils.assertThrows(423 "cannot handle client event",424 FlinkException.class,425 () -> scheduler.deliverCoordinationRequestToCoordinator(testOperatorId, request));426 }427 @Test428 public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {429 final OperatorCoordinator.Provider provider =430 new TestingOperatorCoordinator.Provider(testOperatorId);431 final DefaultScheduler scheduler = createScheduler(provider);432 final String payload = "testing payload";433 final TestingCoordinationRequestHandler.Request<String> request =434 new TestingCoordinationRequestHandler.Request<>(payload);435 CommonTestUtils.assertThrows(436 "does not exist",437 FlinkException.class,438 () -> scheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request));439 }440 // ------------------------------------------------------------------------441 // test setups442 // ------------------------------------------------------------------------443 private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider)444 throws Exception {445 return setupTestJobAndScheduler(provider);446 }447 private DefaultScheduler createAndStartScheduler() throws Exception {448 final DefaultScheduler scheduler =449 setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId));450 scheduler.startScheduling();451 executor.triggerAll();452 // guard test assumptions: this brings tasks into DEPLOYING state453 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0))454 .isEqualTo(ExecutionState.DEPLOYING);455 return scheduler;456 }457 private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {458 return createSchedulerAndDeployTasks(459 new TestingOperatorCoordinator.Provider(testOperatorId));460 }461 private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks()462 throws Exception {463 final DefaultScheduler scheduler =464 setupTestJobAndScheduler(465 new TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);466 scheduleAllTasksToRunning(scheduler);467 return scheduler;468 }469 private DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception {470 final Consumer<JobGraph> noCheckpoints = (jobGraph) -> jobGraph.setSnapshotSettings(null);471 final DefaultScheduler scheduler =472 setupTestJobAndScheduler(473 new TestingOperatorCoordinator.Provider(testOperatorId),474 null,475 noCheckpoints,476 false);477 // guard test assumptions: this must set up a scheduler without checkpoints478 assertThat(scheduler.getExecutionGraph().getCheckpointCoordinator()).isNull();479 scheduleAllTasksToRunning(scheduler);480 return scheduler;481 }482 private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider)483 throws Exception {484 final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);485 scheduleAllTasksToRunning(scheduler);486 return scheduler;487 }488 private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway)489 throws Exception {490 final DefaultScheduler scheduler =491 setupTestJobAndScheduler(492 new TestingOperatorCoordinator.Provider(testOperatorId),493 gateway,494 null,495 false);496 scheduleAllTasksToRunning(scheduler);497 return scheduler;498 }499 private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState)500 throws Exception {501 final byte[] savepointMetadata =502 serializeAsCheckpointMetadata(testOperatorId, coordinatorState);503 final String savepointPointer = "testingSavepointPointer";504 final TestingCheckpointStorageAccessCoordinatorView storage =505 new TestingCheckpointStorageAccessCoordinatorView();506 storage.registerSavepoint(savepointPointer, savepointMetadata);507 final Consumer<JobGraph> savepointConfigurer =508 (jobGraph) -> {509 SchedulerTestingUtils.enableCheckpointing(510 jobGraph, new ModernStateBackend(), storage.asCheckpointStorage());511 jobGraph.setSavepointRestoreSettings(512 SavepointRestoreSettings.forPath(savepointPointer));513 };514 final DefaultScheduler scheduler =515 setupTestJobAndScheduler(516 new TestingOperatorCoordinator.Provider(testOperatorId),517 null,518 savepointConfigurer,519 false);520 scheduler.startScheduling();521 return scheduler;522 }523 private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider)524 throws Exception {525 return setupTestJobAndScheduler(provider, null, null, false);526 }527 private DefaultScheduler setupTestJobAndScheduler(528 OperatorCoordinator.Provider provider,529 @Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,530 @Nullable Consumer<JobGraph> jobGraphPreProcessing,531 boolean restartAllOnFailover)532 throws Exception {533 final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());534 final JobVertex vertex =535 new JobVertex(536 "Vertex with OperatorCoordinator",537 testVertexId,538 Collections.singletonList(opIds));539 vertex.setInvokableClass(NoOpInvokable.class);540 vertex.addOperatorCoordinator(new SerializedValue<>(provider));541 vertex.setParallelism(2);542 final JobGraph jobGraph =543 JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(vertex).build();544 SchedulerTestingUtils.enableCheckpointing(jobGraph);545 if (jobGraphPreProcessing != null) {546 jobGraphPreProcessing.accept(jobGraph);547 }548 final ComponentMainThreadExecutor mainThreadExecutor =549 new ComponentMainThreadExecutorServiceAdapter(550 (ScheduledExecutorService) executor, Thread.currentThread());551 final DefaultSchedulerBuilder schedulerBuilder =552 taskExecutorOperatorEventGateway == null553 ? createSchedulerBuilder(554 jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())555 : createSchedulerBuilder(556 jobGraph,557 mainThreadExecutor,558 taskExecutorOperatorEventGateway,559 EXECUTOR_RESOURCE.getExecutor());560 if (restartAllOnFailover) {561 schedulerBuilder.setFailoverStrategyFactory(new RestartAllFailoverStrategy.Factory());562 }563 final DefaultScheduler scheduler =564 schedulerBuilder.setFutureExecutor(executor).setDelayExecutor(executor).build();565 this.createdScheduler = scheduler;566 return scheduler;567 }568 private static DefaultSchedulerBuilder createSchedulerBuilder(569 JobGraph jobGraph,570 ComponentMainThreadExecutor mainThreadExecutor,571 ScheduledExecutorService scheduledExecutorService) {572 return createSchedulerBuilder(573 jobGraph,574 mainThreadExecutor,575 new SimpleAckingTaskManagerGateway(),576 scheduledExecutorService);577 }578 private static DefaultSchedulerBuilder createSchedulerBuilder(579 JobGraph jobGraph,580 ComponentMainThreadExecutor mainThreadExecutor,581 TaskExecutorOperatorEventGateway operatorEventGateway,582 ScheduledExecutorService scheduledExecutorService) {583 final TaskManagerGateway gateway =584 operatorEventGateway instanceof TaskManagerGateway585 ? (TaskManagerGateway) operatorEventGateway586 : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway);587 return createSchedulerBuilder(588 jobGraph, mainThreadExecutor, gateway, scheduledExecutorService);589 }590 private static DefaultSchedulerBuilder createSchedulerBuilder(591 JobGraph jobGraph,592 ComponentMainThreadExecutor mainThreadExecutor,593 TaskManagerGateway taskManagerGateway,594 ScheduledExecutorService executorService) {595 return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService)596 .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory())597 .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))598 .setExecutionSlotAllocatorFactory(599 new TestExecutionSlotAllocatorFactory(taskManagerGateway));600 }601 private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {602 scheduler.startScheduling();603 executor.triggerAll();604 executor.triggerScheduledTasks();605 SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);606 // guard test assumptions: this brings tasks into RUNNING state607 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0))608 .isEqualTo(ExecutionState.RUNNING);609 // trigger actions depending on the switch to running, like the notifications610 // that the task is reads and the task gateway setup611 executor.triggerAll();612 }613 private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {614 OperatorCoordinatorHolder holder = getCoordinatorHolder(scheduler);615 final OperatorCoordinator coordinator = holder.coordinator();616 assertThat(coordinator).isInstanceOf(TestingOperatorCoordinator.class);617 return (TestingOperatorCoordinator) coordinator;618 }619 private OperatorCoordinatorHolder getCoordinatorHolder(DefaultScheduler scheduler) {620 final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);621 assertThat(vertexWithCoordinator).as("vertex for coordinator not found").isNotNull();622 final Optional<OperatorCoordinatorHolder> coordinatorOptional =623 vertexWithCoordinator.getOperatorCoordinators().stream()624 .filter((holder) -> holder.operatorId().equals(testOperatorId))625 .findFirst();626 assertThat(coordinatorOptional).as("vertex does not contain coordinator").isPresent();627 return coordinatorOptional.get();628 }629 // ------------------------------------------------------------------------630 // test actions631 // ------------------------------------------------------------------------632 private void failTask(DefaultScheduler scheduler, int subtask) {633 SchedulerTestingUtils.failExecution(scheduler, testVertexId, subtask);634 executor.triggerAll();635 // guard the test assumptions: This must not lead to a restart, but must keep the task in636 // FAILED state637 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask))638 .isEqualTo(ExecutionState.FAILED);639 }640 private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) {641 failTask(scheduler, subtask);642 executor.triggerAll();643 executor.triggerScheduledTasks();644 executor.triggerAll();645 // guard the test assumptions: This must lead to a restarting and redeploying646 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask))647 .isEqualTo(ExecutionState.DEPLOYING);648 }649 private void failAndRestartTask(DefaultScheduler scheduler, int subtask) {650 failAndRedeployTask(scheduler, subtask);651 setExecutionToState(ExecutionState.INITIALIZING, scheduler, testVertexId, subtask);652 setExecutionToState(ExecutionState.RUNNING, scheduler, testVertexId, subtask);653 // guard the test assumptions: This must bring the task back to RUNNING654 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask))655 .isEqualTo(ExecutionState.RUNNING);656 }657 private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason)658 throws InterruptedException {659 scheduler.handleGlobalFailure(reason);660 SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);661 // make sure the checkpoint is no longer triggering (this means that the subtask662 // gateway has been closed)663 final CheckpointCoordinator checkpointCoordinator =664 scheduler.getExecutionGraph().getCheckpointCoordinator();665 while (checkpointCoordinator != null && checkpointCoordinator.isTriggering()) {666 Thread.sleep(1);667 }668 // make sure we propagate all asynchronous and delayed actions669 executor.triggerAll();670 executor.triggerScheduledTasks();671 executor.triggerAll();672 SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);673 executor.triggerAll();674 // guard the test assumptions: This must bring the tasks back to RUNNING675 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0))676 .isEqualTo(ExecutionState.RUNNING);677 }678 private void cancelTask(DefaultScheduler scheduler, int subtask) {679 SchedulerTestingUtils.canceledExecution(scheduler, testVertexId, subtask);680 executor.triggerAll();681 // guard the test assumptions: This must not lead to a restart, but must keep the task in682 // FAILED state683 assertThat(SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask))684 .isEqualTo(ExecutionState.CANCELED);685 }686 private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler)687 throws Exception {688 final CompletableFuture<CompletedCheckpoint> future =689 SchedulerTestingUtils.triggerCheckpoint(scheduler);690 final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);691 // the Checkpoint Coordinator executes parts of the logic in its timer thread, and delegates692 // some calls693 // to the scheduler executor. so we need to do a mix of waiting for the timer thread and694 // working off695 // tasks in the scheduler executor.696 // we can drop this here once the CheckpointCoordinator also runs in a 'main thread697 // executor'.698 while (!(coordinator.hasTriggeredCheckpoint() || future.isDone())) {699 executor.triggerAll();700 Thread.sleep(1);701 }702 return future;703 }704 private void waitForCompletionToPropagate(CompletableFuture<?> checkpointFuture) {705 // this part is necessary because the user/application-code-driven coordinator706 // forwards the checkpoint to the scheduler thread, which in turn needs to finish707 // work708 while (!checkpointFuture.isDone()) {709 executor.triggerAll();710 try {711 Thread.sleep(1);712 } catch (InterruptedException e) {713 throw new Error(e);714 }715 }716 }717 private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {718 executor.triggerAll();719 SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);720 executor.triggerAll();721 }722 private long takeCompleteCheckpoint(723 DefaultScheduler scheduler,724 TestingOperatorCoordinator testingOperatorCoordinator,725 byte[] coordinatorState)726 throws Exception {727 final CompletableFuture<CompletedCheckpoint> checkpointFuture =728 triggerCheckpoint(scheduler);729 testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);730 executor.triggerAll();731 OperatorEvent event =732 new AcknowledgeCheckpointEvent(733 testingOperatorCoordinator.getLastTriggeredCheckpointId());734 OperatorCoordinatorHolder holder = getCoordinatorHolder(scheduler);735 for (int i = 0; i < holder.currentParallelism(); i++) {736 holder.handleEventFromOperator(i, 0, event);737 }738 acknowledgeCurrentCheckpoint(scheduler);739 // wait until checkpoint has completed740 final long checkpointId = checkpointFuture.get().getCheckpointID();741 // now wait until it has been acknowledged742 while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {743 executor.triggerAll();744 Thread.sleep(1);745 }746 return checkpointId;747 }748 private void closeScheduler(DefaultScheduler scheduler) throws Exception {749 final CompletableFuture<Void> closeFuture = scheduler.closeAsync();750 executor.triggerAll();751 closeFuture.get();752 }753 // ------------------------------------------------------------------------754 // miscellaneous utilities755 // ------------------------------------------------------------------------756 private static ExecutionJobVertex getJobVertex(757 DefaultScheduler scheduler, JobVertexID jobVertexId) {758 final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);759 return scheduler.getExecutionVertex(id).getJobVertex();760 }761 private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {762 final OperatorState state = new OperatorState(id, 10, 16384);763 state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));764 return state;765 }766 private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState)767 throws IOException {768 final OperatorState state = createOperatorState(id, coordinatorState);769 final CheckpointMetadata metadata =770 new CheckpointMetadata(771 1337L, Collections.singletonList(state), Collections.emptyList());772 final ByteArrayOutputStream out = new ByteArrayOutputStream();773 Checkpoints.storeCheckpointMetadata(metadata, out);774 return out.toByteArray();775 }776 private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {777 if (stateHandle instanceof ByteStreamStateHandle) {778 return ((ByteStreamStateHandle) stateHandle).getData();779 }780 fail("other state handles not implemented");781 return null;782 }783 // ------------------------------------------------------------------------784 // test mocks785 // ------------------------------------------------------------------------786 private static final class TestOperatorEvent implements OperatorEvent {}787 private static final class TestException extends Exception {}788 private static final class CoordinatorThatFailsInStart extends TestingOperatorCoordinator {789 public CoordinatorThatFailsInStart(Context context) {790 super(context);791 }792 @Override793 public void start() throws Exception {794 throw new Exception("test failure");795 }796 }797 private static final class CoordinatorThatFailsCheckpointing798 extends TestingOperatorCoordinator {799 public CoordinatorThatFailsCheckpointing(Context context) {800 super(context);801 }802 @Override803 public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {804 throw new Error(new TestException());805 }806 }807 private static final class FailingTaskExecutorOperatorEventGateway808 implements TaskExecutorOperatorEventGateway {809 @Override810 public CompletableFuture<Acknowledge> sendOperatorEventToTask(811 ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {812 return FutureUtils.completedExceptionally(new TestException());813 }814 }815 private static class ModernStateBackend implements StateBackend {816 @Override817 public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(818 Environment env,819 JobID jobID,820 String operatorIdentifier,821 TypeSerializer<K> keySerializer,822 int numberOfKeyGroups,823 KeyGroupRange keyGroupRange,824 TaskKvStateRegistry kvStateRegistry,825 TtlTimeProvider ttlTimeProvider,826 MetricGroup metricGroup,827 @Nonnull Collection<KeyedStateHandle> stateHandles,828 CloseableRegistry cancelStreamRegistry)829 throws Exception {830 throw new UnsupportedOperationException();831 }832 @Override833 public OperatorStateBackend createOperatorStateBackend(834 Environment env,835 String operatorIdentifier,836 @Nonnull Collection<OperatorStateHandle> stateHandles,837 CloseableRegistry cancelStreamRegistry)838 throws Exception {839 throw new UnsupportedOperationException();840 }841 }842 private static final class TaskExecutorOperatorEventGatewayAdapter843 extends SimpleAckingTaskManagerGateway {844 private final TaskExecutorOperatorEventGateway operatorGateway;845 private TaskExecutorOperatorEventGatewayAdapter(846 TaskExecutorOperatorEventGateway operatorGateway) {847 this.operatorGateway = operatorGateway;848 }849 @Override850 public CompletableFuture<Acknowledge> sendOperatorEventToTask(851 ExecutionAttemptID task, OperatorID operator, SerializedValue<OperatorEvent> evt) {852 return operatorGateway.sendOperatorEventToTask(task, operator, evt);853 }854 }855}...
testException
Using AI Code Generation
1import static org.assertj.core.api.Assumptions.assumeThat;2import static org.assertj.core.api.Assertions.assertThat;3import org.junit.jupiter.api.Test;4public class AssumptionsTest {5 public void testAssumptions() {6 assumeThat("ABC").isNotNull();7 assumeThat("ABC").startsWith("A");8 assumeThat("ABC").endsWith("C");9 assertThat("ABC").isEqualTo("ABC");10 }11 public void testAssumptionsWithException() {12 assumeThat("ABC").isNotNull();13 assumeThat("ABC").startsWith("A");14 assumeThat("ABC").endsWith("C");15 assertThat("ABC").isEqualTo("ABC");16 }17}18[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ assumptions ---19[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ assumptions ---20[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ assumptions ---21[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ assumptions ---22[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ assumptions ---
testException
Using AI Code Generation
1Assumptions.assumeThat( "abc" ).contains( "a" );2Assertions.assertThat( "abc" ).contains( "a" );3BDDAssumptions.assumeThat( "abc" ).contains( "a" );4BDDAssertions.assertThat( "abc" ).contains( "a" );5CatchThrowable.assumeThat( "abc" ).contains( "a" );6CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );7CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );8CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );9CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );10CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );11CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );12CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );13CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );14CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );15CatchThrowableAssert.assumeThat( "abc" ).contains( "a" );
testException
Using AI Code Generation
1public class AssumptionsTest {2 public void testAssumption() {3 Assumptions.assumeThat("test").isNotNull();4 }5 public void testAssumptionWithException() {6 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test");7 }8 public void testAssumptionWithExceptionWithMessage() {9 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("Assumption failed");10 }11 public void testAssumptionWithExceptionWithMessageAndMessageSupplier() {12 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("Assumption failed", "test");13 }14 public void testAssumptionWithExceptionWithMessageSupplier() {15 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("test");16 }17 public void testAssumptionWithExceptionWithMessageSupplierAndMessageSupplier() {18 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("test", "test");19 }20 public void testAssumptionWithExceptionWithMessageSupplierAndMessage() {21 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("test", "Assumption failed");22 }23 public void testAssumptionWithExceptionWithMessageSupplierAndMessageSupplierAndMessage() {24 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("test", "test", "Assumption failed");25 }26 public void testAssumptionWithExceptionWithMessageSupplierAndMessageAndMessageSupplier() {27 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("test", "Assumption failed", "test");28 }29 public void testAssumptionWithExceptionWithMessageAndMessageSupplierAndMessageSupplier() {30 Assumptions.assumeThat("test").isNotNull().isNotEmpty().isEqualTo("test").as("Assumption failed", "test", "test");31 }32 public void testAssumptionWithExceptionWithMessageAndMessageAndMessageSupplier() {
testException
Using AI Code Generation
1import static org.assertj.core.api.Assumptions.*;2import org.junit.Test;3public class TestException {4 public void testException() {5 testException(() -> {6 throw new RuntimeException("test");7 }).isInstanceOf(RuntimeException.class);8 }9}
testException
Using AI Code Generation
1Assumptions.assumeThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");2Assertions.assertThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");3Assumptions.assumeThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");4Assertions.assertThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");5Assumptions.assumeThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");6Assertions.assertThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");7Assumptions.assumeThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");8Assertions.assertThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");9Assumptions.assumeThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");10Assertions.assertThatExceptionOfType(exception).isThrownBy(() -> { throw new IllegalArgumentException(); }).hasMessage("some message");
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!