How OpenSearch Selects a Cluster Manager

2022-08-06

OpenSearch is a fork created by AWS based on ElasticSearch 7.X. The new fork continues the Apache 2.0 open-source license and will evolve independently, no longer following ElasticSearch's release cadence.

This article will briefly introduce the cluster manager election process based on OpenSearch's codebase, tag: 2.1.0, commitID: 388c80ad945.

Inheriting the main architecture from ElasticSearch, OpenSearch remains a distributed search engine. The distributed system still requires a "master" node responsible for managing the overall cluster state. This "master" node role is referred to as cluster manager in OpenSearch. Nodes eligible for this role are called cluster manager eligible.

Code Logic

The cluster manager election process in OpenSearch is primarily implemented in the org.opensearch.cluster.coordination.Coordinator class.

When an OpenSearch instance starts, the start method in the org.opensearch.node.Node class calls the startInitialJoin method of the org.opensearch.discovery.Discovery interface to attempt joining the cluster.

At this point, the Node first sets itself to the Candidate role, then uses the clusterBootstrapService within Coordinator to generate the initial voting configuration (VotingConfiguration), calls its own setInitialConfiguration method, and ultimately triggers the startElectionScheduler method to start the election process. The electionScheduler is essentially a JVM thread object that periodically triggers the following flow:

@Override
public void run() {
    synchronized (mutex) {
        if (mode == Mode.CANDIDATE) {
            final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();

            if (localNodeMayWinElection(lastAcceptedState) == false) {
                return;
            }

            final StatusInfo statusInfo = nodeHealthService.getHealth();
            if (statusInfo.getStatus() == UNHEALTHY) {
                return;
            }

            if (prevotingRound != null) {
                prevotingRound.close();
            }
            final List<DiscoveryNode> discoveredNodes = getDiscoveredNodes().stream()
                .filter(n -> isZen1Node(n) == false)
                .collect(Collectors.toList());

            prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
        }
    }
}

The localNodeMayWinElection method first confirms that the local node is eligible to participate in the process; otherwise, it exits immediately. The core judgment logic is whether the node was previously "among the voting participants" or "not among the excluded voters."

The final step is essentially sending a preVoteRequest to all broadcasted nodes. The responses are processed in the handlePreVoteResponse method within org.opensearch.cluster.coordination.PreVoteCollector:

private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
    updateMaxTermSeen.accept(response.getCurrentTerm());
    preVotesReceived.forEach((node, preVoteResponse) -> 
        voteCollection.addJoinVote(...)
    );
    if (electionStrategy.isElectionQuorum(...) == false) {
        return;
    }
    startElection.run();
}
private void startElection() {
    synchronized (mutex) {
        if (mode == Mode.CANDIDATE) {
            if (localNodeMayWinElection(getLastAcceptedState()) == false) {
                return;
            }

            final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
            getDiscoveredNodes().forEach(node -> {
                if (isZen1Node(node) == false) {
                    joinHelper.sendStartJoinRequest(startJoinRequest, node);
                }
            });
        }
    }
}

This logic is straightforward: increment the highest known term value by one to form the next term, and send StartJoinRequest to all known nodes.

The request has been sent out, but how is it processed?

The logic resides in the processJoinRequest method of Coordinator:

private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
    final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
    synchronized (mutex) {
        updateMaxTermSeen(joinRequest.getTerm());

        final CoordinationState coordState = coordinationState.get();
        final boolean prevElectionWon = coordState.electionWon();

        optionalJoin.ifPresent(this::handleJoin);
        joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);

        if (prevElectionWon == false && coordState.electionWon()) {
            becomeLeader("handleJoinRequest");
        }
    }
}

The core processing logic is located at optionalJoin.ifPresent(this::handleJoin). By comparing whether the election was won before and after processing the request, the node calls becomeLeader when necessary to adjust its own state and enter the formal cluster manager role.

Internally, handleJoin essentially calls the handleJoin method of org.opensearch.cluster.coordination.CoordinationState:

public boolean handleJoin(Join join) {
    if (join.getTerm() != getCurrentTerm()) {
        throw new CoordinationStateRejectedException(...);
    }

    if (startedJoinSinceLastReboot == false) {
        throw new CoordinationStateRejectedException(...);
    }

    final long lastAcceptedTerm = getLastAcceptedTerm();
    if (join.getLastAcceptedTerm() > lastAcceptedTerm) {
        throw new CoordinationStateRejectedException(...);
    }

    if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersionOrMetadataVersion()) {
        throw new CoordinationStateRejectedException(...);
    }

    if (getLastAcceptedConfiguration().isEmpty()) {
        throw new CoordinationStateRejectedException(...);
    }

    boolean added = joinVotes.addJoinVote(join);
    boolean prevElectionWon = electionWon;
    electionWon = isElectionQuorum(joinVotes);
    assert !prevElectionWon || electionWon;
    if (electionWon && prevElectionWon == false) {
        lastPublishedVersion = getLastAcceptedVersion();
    }
    return added;
}

As the logic above shows, the receiving node validates the request against its own known term and metadata version. Only when legitimate requests reach a quorum does the metadata version get updated according to the request.

Summary

The preceding sections provided a basic introduction to how OpenSearch sends and processes cluster manager election requests.

Although there are few blog posts specifically about OpenSearch, searching for ElasticSearch-related articles mostly yields claims that ES's election process is based on the Bully algorithm. However, the core election logic of the Bully algorithm is "sort candidate nodes by ID and select the largest." This is not reflected in the code shown above.

Instead, terms like term, version, and quorum keep catching the eye, which inevitably brings to mind: this algorithm is much closer to the Raft consensus algorithm.

ElasticSearch must have made significant updates to its core election algorithm at some point.

References