...19import org.openqa.selenium.Beta;20import org.openqa.selenium.Capabilities;21import org.openqa.selenium.ImmutableCapabilities;22import org.openqa.selenium.RetrySessionRequestException;23import org.openqa.selenium.SessionNotCreatedException;24import org.openqa.selenium.WebDriverException;25import org.openqa.selenium.concurrent.Regularly;26import org.openqa.selenium.events.EventBus;27import org.openqa.selenium.grid.config.Config;28import org.openqa.selenium.grid.data.CreateSessionRequest;29import org.openqa.selenium.grid.data.CreateSessionResponse;30import org.openqa.selenium.grid.data.DistributorStatus;31import org.openqa.selenium.grid.data.NewSessionRequestEvent;32import org.openqa.selenium.grid.data.NodeAddedEvent;33import org.openqa.selenium.grid.data.NodeDrainComplete;34import org.openqa.selenium.grid.data.NodeHeartBeatEvent;35import org.openqa.selenium.grid.data.NodeId;36import org.openqa.selenium.grid.data.NodeStatus;37import org.openqa.selenium.grid.data.NodeStatusEvent;38import org.openqa.selenium.grid.data.RequestId;39import org.openqa.selenium.grid.data.SessionRequest;40import org.openqa.selenium.grid.data.SessionRequestCapability;41import org.openqa.selenium.grid.data.Slot;42import org.openqa.selenium.grid.data.SlotId;43import org.openqa.selenium.grid.data.TraceSessionRequest;44import org.openqa.selenium.grid.distributor.Distributor;45import org.openqa.selenium.grid.distributor.config.DistributorOptions;46import org.openqa.selenium.grid.distributor.selector.SlotSelector;47import org.openqa.selenium.grid.log.LoggingOptions;48import org.openqa.selenium.grid.node.HealthCheck;49import org.openqa.selenium.grid.node.Node;50import org.openqa.selenium.grid.node.remote.RemoteNode;51import org.openqa.selenium.grid.security.Secret;52import org.openqa.selenium.grid.security.SecretOptions;53import org.openqa.selenium.grid.server.EventBusOptions;54import org.openqa.selenium.grid.server.NetworkOptions;55import org.openqa.selenium.grid.sessionmap.SessionMap;56import org.openqa.selenium.grid.sessionmap.config.SessionMapOptions;57import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;58import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;59import org.openqa.selenium.internal.Either;60import org.openqa.selenium.internal.Require;61import org.openqa.selenium.remote.SessionId;62import org.openqa.selenium.remote.http.HttpClient;63import org.openqa.selenium.remote.tracing.AttributeKey;64import org.openqa.selenium.remote.tracing.EventAttribute;65import org.openqa.selenium.remote.tracing.EventAttributeValue;66import org.openqa.selenium.remote.tracing.Span;67import org.openqa.selenium.remote.tracing.Status;68import org.openqa.selenium.remote.tracing.Tracer;69import org.openqa.selenium.status.HasReadyState;70import java.io.UncheckedIOException;71import java.time.Duration;72import java.util.ArrayList;73import java.util.Collection;74import java.util.HashMap;75import java.util.List;76import java.util.Map;77import java.util.Optional;78import java.util.Set;79import java.util.concurrent.ConcurrentHashMap;80import java.util.concurrent.Executors;81import java.util.concurrent.locks.Lock;82import java.util.concurrent.locks.ReadWriteLock;83import java.util.concurrent.locks.ReentrantReadWriteLock;84import java.util.logging.Level;85import java.util.logging.Logger;86import java.util.stream.Collectors;87import static com.google.common.collect.ImmutableSet.toImmutableSet;88import static org.openqa.selenium.grid.data.Availability.DOWN;89import static org.openqa.selenium.grid.data.Availability.DRAINING;90import static org.openqa.selenium.internal.Debug.getDebugLogLevel;91import static org.openqa.selenium.remote.RemoteTags.CAPABILITIES;92import static org.openqa.selenium.remote.RemoteTags.CAPABILITIES_EVENT;93import static org.openqa.selenium.remote.RemoteTags.SESSION_ID;94import static org.openqa.selenium.remote.RemoteTags.SESSION_ID_EVENT;95import static org.openqa.selenium.remote.tracing.AttributeKey.SESSION_URI;96import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION;97public class LocalDistributor extends Distributor {98 private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName());99 private final Tracer tracer;100 private final EventBus bus;101 private final HttpClient.Factory clientFactory;102 private final SessionMap sessions;103 private final SlotSelector slotSelector;104 private final Secret registrationSecret;105 private final Regularly hostChecker = new Regularly("distributor host checker");106 private final Map<NodeId, Runnable> allChecks = new HashMap<>();107 private final Duration healthcheckInterval;108 private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);109 private final GridModel model;110 private final Map<NodeId, Node> nodes;111 private final NewSessionQueue sessionQueue;112 private final Regularly regularly;113 private final boolean rejectUnsupportedCaps;114 public LocalDistributor(115 Tracer tracer,116 EventBus bus,117 HttpClient.Factory clientFactory,118 SessionMap sessions,119 NewSessionQueue sessionQueue,120 SlotSelector slotSelector,121 Secret registrationSecret,122 Duration healthcheckInterval,123 boolean rejectUnsupportedCaps) {124 super(tracer, clientFactory, registrationSecret);125 this.tracer = Require.nonNull("Tracer", tracer);126 this.bus = Require.nonNull("Event bus", bus);127 this.clientFactory = Require.nonNull("HTTP client factory", clientFactory);128 this.sessions = Require.nonNull("Session map", sessions);129 this.sessionQueue = Require.nonNull("New Session Request Queue", sessionQueue);130 this.slotSelector = Require.nonNull("Slot selector", slotSelector);131 this.registrationSecret = Require.nonNull("Registration secret", registrationSecret);132 this.healthcheckInterval = Require.nonNull("Health check interval", healthcheckInterval);133 this.model = new GridModel(bus);134 this.nodes = new ConcurrentHashMap<>();135 this.rejectUnsupportedCaps = rejectUnsupportedCaps;136 bus.addListener(NodeStatusEvent.listener(this::register));137 bus.addListener(NodeStatusEvent.listener(model::refresh));138 bus.addListener(NodeHeartBeatEvent.listener(nodeStatus -> {139 if (nodes.containsKey(nodeStatus.getId())) {140 model.touch(nodeStatus.getId());141 } else {142 register(nodeStatus);143 }144 }));145 regularly = new Regularly(146 Executors.newSingleThreadScheduledExecutor(147 r -> {148 Thread thread = new Thread(r);149 thread.setName("New Session Queue");150 thread.setDaemon(true);151 return thread;152 }));153 NewSessionRunnable newSessionRunnable = new NewSessionRunnable();154 bus.addListener(NodeDrainComplete.listener(this::remove));155 bus.addListener(NewSessionRequestEvent.listener(ignored -> newSessionRunnable.run()));156 regularly.submit(model::purgeDeadNodes, Duration.ofSeconds(30), Duration.ofSeconds(30));157 regularly.submit(newSessionRunnable, Duration.ofSeconds(5), Duration.ofSeconds(5));158 }159 public static Distributor create(Config config) {160 Tracer tracer = new LoggingOptions(config).getTracer();161 EventBus bus = new EventBusOptions(config).getEventBus();162 DistributorOptions distributorOptions = new DistributorOptions(config);163 HttpClient.Factory clientFactory = new NetworkOptions(config).getHttpClientFactory(tracer);164 SessionMap sessions = new SessionMapOptions(config).getSessionMap();165 SecretOptions secretOptions = new SecretOptions(config);166 NewSessionQueue sessionQueue = new NewSessionQueueOptions(config).getSessionQueue(167 "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue");168 return new LocalDistributor(169 tracer,170 bus,171 clientFactory,172 sessions,173 sessionQueue,174 distributorOptions.getSlotSelector(),175 secretOptions.getRegistrationSecret(),176 distributorOptions.getHealthCheckInterval(),177 distributorOptions.shouldRejectUnsupportedCaps());178 }179 @Override180 public boolean isReady() {181 try {182 return ImmutableSet.of(bus, sessions).parallelStream()183 .map(HasReadyState::isReady)184 .reduce(true, Boolean::logicalAnd);185 } catch (RuntimeException e) {186 return false;187 }188 }189 private void register(NodeStatus status) {190 Require.nonNull("Node", status);191 Lock writeLock = lock.writeLock();192 writeLock.lock();193 try {194 if (nodes.containsKey(status.getId())) {195 return;196 }197 Set<Capabilities> capabilities = status.getSlots().stream()198 .map(Slot::getStereotype)199 .map(ImmutableCapabilities::copyOf)200 .collect(toImmutableSet());201 // A new node! Add this as a remote node, since we've not called add202 RemoteNode remoteNode = new RemoteNode(203 tracer,204 clientFactory,205 status.getId(),206 status.getUri(),207 registrationSecret,208 capabilities);209 add(remoteNode);210 } finally {211 writeLock.unlock();212 }213 }214 @Override215 public LocalDistributor add(Node node) {216 Require.nonNull("Node", node);217 LOG.info(String.format("Added node %s at %s.", node.getId(), node.getUri()));218 nodes.put(node.getId(), node);219 model.add(node.getStatus());220 // Extract the health check221 Runnable runnableHealthCheck = asRunnableHealthCheck(node);222 allChecks.put(node.getId(), runnableHealthCheck);223 hostChecker.submit(runnableHealthCheck, healthcheckInterval, Duration.ofSeconds(30));224 bus.fire(new NodeAddedEvent(node.getId()));225 return this;226 }227 private Runnable asRunnableHealthCheck(Node node) {228 HealthCheck healthCheck = node.getHealthCheck();229 NodeId id = node.getId();230 return () -> {231 HealthCheck.Result result;232 try {233 result = healthCheck.check();234 } catch (Exception e) {235 LOG.log(Level.WARNING, "Unable to process node " + id, e);236 result = new HealthCheck.Result(DOWN, "Unable to run healthcheck. Assuming down");237 }238 Lock writeLock = lock.writeLock();239 writeLock.lock();240 try {241 model.setAvailability(id, result.getAvailability());242 } finally {243 writeLock.unlock();244 }245 };246 }247 @Override248 public boolean drain(NodeId nodeId) {249 Node node = nodes.get(nodeId);250 if (node == null) {251 LOG.info("Asked to drain unregistered node " + nodeId);252 return false;253 }254 Lock writeLock = lock.writeLock();255 writeLock.lock();256 try {257 node.drain();258 model.setAvailability(nodeId, DRAINING);259 } finally {260 writeLock.unlock();261 }262 return node.isDraining();263 }264 public void remove(NodeId nodeId) {265 Lock writeLock = lock.writeLock();266 writeLock.lock();267 try {268 model.remove(nodeId);269 Runnable runnable = allChecks.remove(nodeId);270 if (runnable != null) {271 hostChecker.remove(runnable);272 }273 } finally {274 writeLock.unlock();275 }276 }277 @Override278 public DistributorStatus getStatus() {279 Lock readLock = this.lock.readLock();280 readLock.lock();281 try {282 return new DistributorStatus(model.getSnapshot());283 } finally {284 readLock.unlock();285 }286 }287 @Beta288 public void refresh() {289 List<Runnable> allHealthChecks = new ArrayList<>();290 Lock readLock = this.lock.readLock();291 readLock.lock();292 try {293 allHealthChecks.addAll(allChecks.values());294 } finally {295 readLock.unlock();296 }297 allHealthChecks.parallelStream().forEach(Runnable::run);298 }299 protected Set<NodeStatus> getAvailableNodes() {300 Lock readLock = this.lock.readLock();301 readLock.lock();302 try {303 return model.getSnapshot().stream()304 .filter(node -> !DOWN.equals(node.getAvailability()))305 .collect(toImmutableSet());306 } finally {307 readLock.unlock();308 }309 }310 @Override311 public Either<SessionNotCreatedException, CreateSessionResponse> newSession(SessionRequest request)312 throws SessionNotCreatedException {313 Require.nonNull("Requests to process", request);314 Span span = tracer.getCurrentContext().createSpan("distributor.new_session");315 Map<String, EventAttributeValue> attributeMap = new HashMap<>();316 try {317 attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(),318 EventAttribute.setValue(getClass().getName()));319 attributeMap.put("request.payload", EventAttribute.setValue(request.getDesiredCapabilities().toString()));320 String sessionReceivedMessage = "Session request received by the distributor";321 span.addEvent(sessionReceivedMessage, attributeMap);322 LOG.info(String.format("%s: \n %s", sessionReceivedMessage, request.getDesiredCapabilities()));323 // If there are no capabilities at all, something is horribly wrong324 if (request.getDesiredCapabilities().isEmpty()) {325 SessionNotCreatedException exception =326 new SessionNotCreatedException("No capabilities found in session request payload");327 EXCEPTION.accept(attributeMap, exception);328 attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(),329 EventAttribute.setValue("Unable to create session. No capabilities found: " +330 exception.getMessage()));331 span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);332 return Either.left(exception);333 }334 boolean retry = false;335 SessionNotCreatedException lastFailure = new SessionNotCreatedException("Unable to create new session");336 for (Capabilities caps : request.getDesiredCapabilities()) {337 if (!isSupported(caps)) {338 continue;339 }340 // Try and find a slot that we can use for this session. While we341 // are finding the slot, no other session can possibly be started.342 // Therefore, spend as little time as possible holding the write343 // lock, and release it as quickly as possible. Under no344 // circumstances should we try to actually start the session itself345 // in this next block of code.346 SlotId selectedSlot = reserveSlot(request.getRequestId(), caps);347 if (selectedSlot == null) {348 LOG.info(String.format("Unable to find slot for request %s. May retry: %s ", request.getRequestId(), caps));349 retry = true;350 continue;351 }352 CreateSessionRequest singleRequest = new CreateSessionRequest(353 request.getDownstreamDialects(),354 caps,355 request.getMetadata());356 try {357 CreateSessionResponse response = startSession(selectedSlot, singleRequest);358 sessions.add(response.getSession());359 model.setSession(selectedSlot, response.getSession());360 SessionId sessionId = response.getSession().getId();361 Capabilities sessionCaps = response.getSession().getCapabilities();362 String sessionUri = response.getSession().getUri().toString();363 SESSION_ID.accept(span, sessionId);364 CAPABILITIES.accept(span, sessionCaps);365 SESSION_ID_EVENT.accept(attributeMap, sessionId);366 CAPABILITIES_EVENT.accept(attributeMap, sessionCaps);367 span.setAttribute(SESSION_URI.getKey(), sessionUri);368 attributeMap.put(SESSION_URI.getKey(), EventAttribute.setValue(sessionUri));369 String sessionCreatedMessage = "Session created by the distributor";370 span.addEvent(sessionCreatedMessage, attributeMap);371 LOG.info(String.format("%s. Id: %s, Caps: %s", sessionCreatedMessage, sessionId, sessionCaps));372 return Either.right(response);373 } catch (SessionNotCreatedException e) {374 model.setSession(selectedSlot, null);375 lastFailure = e;376 }377 }378 // If we've made it this far, we've not been able to start a session379 if (retry) {380 lastFailure = new RetrySessionRequestException(381 "Will re-attempt to find a node which can run this session",382 lastFailure);383 attributeMap.put(384 AttributeKey.EXCEPTION_MESSAGE.getKey(),385 EventAttribute.setValue("Will retry session " + request.getRequestId()));386 } else {387 EXCEPTION.accept(attributeMap, lastFailure);388 attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(),389 EventAttribute.setValue("Unable to create session: " + lastFailure.getMessage()));390 span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);391 }392 return Either.left(lastFailure);393 } catch (SessionNotCreatedException e) {394 span.setAttribute(AttributeKey.ERROR.getKey(), true);395 span.setStatus(Status.ABORTED);396 EXCEPTION.accept(attributeMap, e);397 attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(),398 EventAttribute.setValue("Unable to create session: " + e.getMessage()));399 span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);400 return Either.left(e);401 } catch (UncheckedIOException e) {402 span.setAttribute(AttributeKey.ERROR.getKey(), true);403 span.setStatus(Status.UNKNOWN);404 EXCEPTION.accept(attributeMap, e);405 attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(),406 EventAttribute.setValue("Unknown error in LocalDistributor while creating session: " + e.getMessage()));407 span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap);408 return Either.left(new SessionNotCreatedException(e.getMessage(), e));409 } finally {410 span.close();411 }412 }413 private CreateSessionResponse startSession(SlotId selectedSlot, CreateSessionRequest singleRequest) {414 Node node = nodes.get(selectedSlot.getOwningNodeId());415 if (node == null) {416 throw new SessionNotCreatedException("Unable to find owning node for slot");417 }418 Either<WebDriverException, CreateSessionResponse> result;419 try {420 result = node.newSession(singleRequest);421 } catch (SessionNotCreatedException e) {422 result = Either.left(e);423 } catch (RuntimeException e) {424 result = Either.left(new SessionNotCreatedException(e.getMessage(), e));425 }426 if (result.isLeft()) {427 WebDriverException exception = result.left();428 if (exception instanceof SessionNotCreatedException) {429 throw exception;430 }431 throw new SessionNotCreatedException(exception.getMessage(), exception);432 }433 return result.right();434 }435 private SlotId reserveSlot(RequestId requestId, Capabilities caps) {436 Lock writeLock = lock.writeLock();437 writeLock.lock();438 try {439 Set<SlotId> slotIds = slotSelector.selectSlot(caps, getAvailableNodes());440 if (slotIds.isEmpty()) {441 LOG.log(442 getDebugLogLevel(),443 String.format("No slots found for request %s and capabilities %s", requestId, caps));444 return null;445 }446 for (SlotId slotId : slotIds) {447 if (reserve(slotId)) {448 return slotId;449 }450 }451 return null;452 } finally {453 writeLock.unlock();454 }455 }456 private boolean isSupported(Capabilities caps) {457 return getAvailableNodes().stream().anyMatch(node -> node.hasCapability(caps));458 }459 private boolean reserve(SlotId id) {460 Require.nonNull("Slot ID", id);461 Lock writeLock = this.lock.writeLock();462 writeLock.lock();463 try {464 Node node = nodes.get(id.getOwningNodeId());465 if (node == null) {466 LOG.log(getDebugLogLevel(), String.format("Unable to find node with id %s", id));467 return false;468 }469 return model.reserve(id);470 } finally {471 writeLock.unlock();472 }473 }474 public void callExecutorShutdown() {475 LOG.info("Shutting down Distributor executor service");476 regularly.shutdown();477 }478 public class NewSessionRunnable implements Runnable {479 @Override480 public void run() {481 List<SessionRequestCapability> queueContents = sessionQueue.getQueueContents();482 if (rejectUnsupportedCaps) {483 checkMatchingSlot(queueContents);484 }485 int initialSize = queueContents.size();486 boolean retry = initialSize != 0;487 while (retry) {488 // We deliberately run this outside of a lock: if we're unsuccessful489 // starting the session, we just put the request back on the queue.490 // This does mean, however, that under high contention, we might end491 // up starving a session request.492 Set<Capabilities> stereotypes =493 getAvailableNodes().stream()494 .filter(NodeStatus::hasCapacity)495 .map(496 node ->497 node.getSlots().stream()498 .map(Slot::getStereotype)499 .collect(Collectors.toSet()))500 .flatMap(Collection::stream)501 .collect(Collectors.toSet());502 Optional<SessionRequest> maybeRequest = sessionQueue.getNextAvailable(stereotypes);503 maybeRequest.ifPresent(this::handleNewSessionRequest);504 int currentSize = sessionQueue.getQueueContents().size();505 retry = currentSize != 0 && currentSize != initialSize;506 initialSize = currentSize;507 }508 }509 private void checkMatchingSlot(List<SessionRequestCapability> sessionRequests) {510 for(SessionRequestCapability request : sessionRequests) {511 long unmatchableCount = request.getDesiredCapabilities().stream()512 .filter(caps -> !isSupported(caps))513 .count();514 if (unmatchableCount == request.getDesiredCapabilities().size()) {515 SessionNotCreatedException exception = new SessionNotCreatedException(516 "No nodes support the capabilities in the request");517 sessionQueue.complete(request.getRequestId(), Either.left(exception));518 }519 }520 }521 private void handleNewSessionRequest(SessionRequest sessionRequest) {522 RequestId reqId = sessionRequest.getRequestId();523 try (Span span = TraceSessionRequest.extract(tracer, sessionRequest).createSpan("distributor.poll_queue")) {524 Map<String, EventAttributeValue> attributeMap = new HashMap<>();525 attributeMap.put(526 AttributeKey.LOGGER_CLASS.getKey(),527 EventAttribute.setValue(getClass().getName()));528 span.setAttribute(AttributeKey.REQUEST_ID.getKey(), reqId.toString());529 attributeMap.put(530 AttributeKey.REQUEST_ID.getKey(),531 EventAttribute.setValue(reqId.toString()));532 attributeMap.put("request", EventAttribute.setValue(sessionRequest.toString()));533 Either<SessionNotCreatedException, CreateSessionResponse> response = newSession(sessionRequest);534 if (response.isLeft() && response.left() instanceof RetrySessionRequestException) {535 try(Span childSpan = span.createSpan("distributor.retry")) {536 LOG.info("Retrying");537 boolean retried = sessionQueue.retryAddToQueue(sessionRequest);538 attributeMap.put("request.retry_add", EventAttribute.setValue(retried));539 childSpan.addEvent("Retry adding to front of queue. No slot available.", attributeMap);540 if (retried) {541 return;542 }543 childSpan.addEvent("retrying_request", attributeMap);544 }545 }546 sessionQueue.complete(reqId, response);547 }...