hadoop GlobalStateIdContext 源码

  • 2022-10-20
  • 浏览 (228)

haddop GlobalStateIdContext 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;

/**
 * This is the server side implementation responsible for passing
 * state alignment info to clients.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
class GlobalStateIdContext implements AlignmentContext {
  /**
   * Estimated number of journal transactions a typical NameNode can execute
   * per second. The number is used to estimate how long a client's
   * RPC request will wait in the call queue before the Observer catches up
   * with its state id.
   */
  private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;

  /**
   * The client wait time on an RPC request is composed of
   * the server execution time plus the communication time.
   * This is an expected fraction of the total wait time spent on
   * server execution.
   */
  private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;

  private final FSNamesystem namesystem;
  private final HashSet<String> coordinatedMethods;

  /**
   * Server side constructor.
   * @param namesystem server side state provider
   */
  GlobalStateIdContext(FSNamesystem namesystem) {
    this.namesystem = namesystem;
    this.coordinatedMethods = new HashSet<>();
    // For now, only ClientProtocol methods can be coordinated, so only checking
    // against ClientProtocol.
    for (Method method : ClientProtocol.class.getDeclaredMethods()) {
      if (method.isAnnotationPresent(ReadOnly.class) &&
          method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) {
        coordinatedMethods.add(method.getName());
      }
    }
  }

  /**
   * Server side implementation for providing state alignment info in responses.
   */
  @Override
  public void updateResponseState(RpcResponseHeaderProto.Builder header) {
    // Using getCorrectLastAppliedOrWrittenTxId will acquire the lock on
    // FSEditLog. This is needed so that ANN will return the correct state id
    // it currently has. But this may not be necessary for Observer, may want
    // revisit for optimization. Same goes to receiveRequestState.
    header.setStateId(getLastSeenStateId());
  }

  /**
   * Server side implementation only provides state alignment info.
   * It does not receive state alignment info therefore this does nothing.
   */
  @Override
  public void receiveResponseState(RpcResponseHeaderProto header) {
    // Do nothing.
  }

  /**
   * Server side implementation only receives state alignment info.
   * It does not build RPC requests therefore this does nothing.
   */
  @Override
  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
    // Do nothing.
  }

  /**
   * Server-side implementation for processing state alignment info in
   * requests.
   * For Observer it compares the client and the server states and determines
   * if it makes sense to wait until the server catches up with the client
   * state. If not the server throws RetriableException so that the client
   * could retry the call according to the retry policy with another Observer
   * or the Active NameNode.
   *
   * @param header The RPC request header.
   * @param clientWaitTime time in milliseconds indicating how long client
   *    waits for the server response. It is used to verify if the client's
   *    state is too far ahead of the server's
   * @return the minimum of the state ids of the client or the server.
   * @throws RetriableException if Observer is too far behind.
   */
  @Override
  public long receiveRequestState(RpcRequestHeaderProto header,
      long clientWaitTime) throws IOException {
    if (!header.hasStateId() &&
        HAServiceState.OBSERVER.equals(namesystem.getState())) {
      // This could happen if client configured with non-observer proxy provider
      // (e.g., ConfiguredFailoverProxyProvider) is accessing a cluster with
      // observers. In this case, we should let the client failover to the
      // active node, rather than potentially serving stale result (client
      // stateId is 0 if not set).
      throw new StandbyException("Observer Node received request without "
          + "stateId. This mostly likely is because client is not configured "
          + "with " + ObserverReadProxyProvider.class.getSimpleName());
    }
    long serverStateId = getLastSeenStateId();
    long clientStateId = header.getStateId();
    FSNamesystem.LOG.trace("Client State ID= {} and Server State ID= {}",
        clientStateId, serverStateId);

    if (clientStateId > serverStateId &&
        HAServiceState.ACTIVE.equals(namesystem.getState())) {
      FSNamesystem.LOG.warn("The client stateId: {} is greater than "
          + "the server stateId: {} This is unexpected. "
          + "Resetting client stateId to server stateId",
          clientStateId, serverStateId);
      return serverStateId;
    }
    if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
        clientStateId - serverStateId >
        ESTIMATED_TRANSACTIONS_PER_SECOND
            * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
            * ESTIMATED_SERVER_TIME_MULTIPLIER) {
      throw new RetriableException(
          "Observer Node is too far behind: serverStateId = "
              + serverStateId + " clientStateId = " + clientStateId);
    }
    return clientStateId;
  }

  @Override
  public long getLastSeenStateId() {
    // Should not need to call getCorrectLastAppliedOrWrittenTxId()
    // see HDFS-14822.
    return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
  }

  @Override
  public boolean isCoordinatedCall(String protocolName, String methodName) {
    return protocolName.equals(ClientProtocol.class.getCanonicalName())
        && coordinatedMethods.contains(methodName);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AclEntryStatusFormat 源码

hadoop AclFeature 源码

hadoop AclStorage 源码

hadoop AclTransformation 源码

hadoop AuditLogger 源码

hadoop BackupImage 源码

hadoop BackupJournalManager 源码

hadoop BackupNode 源码

hadoop BackupState 源码

hadoop CacheManager 源码

0  赞