hadoop FederationMembershipStateStoreInputValidator 源码

  • 2022-10-20
  • 浏览 (183)

haddop FederationMembershipStateStoreInputValidator 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.federation.store.utils;

import java.net.URI;

import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Utility class to validate the inputs to
 * {@code FederationMembershipStateStore}, allows a fail fast mechanism for
 * invalid user inputs.
 *
 */
public final class FederationMembershipStateStoreInputValidator {

  private static final Logger LOG = LoggerFactory
      .getLogger(FederationMembershipStateStoreInputValidator.class);

  private FederationMembershipStateStoreInputValidator() {
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast). Check if the provided {@link SubClusterRegisterRequest} for
   * registration a new subcluster is valid or not.
   *
   * @param request the {@link SubClusterRegisterRequest} to validate against
   * @throws FederationStateStoreInvalidInputException if the request is invalid
   */
  public static void validate(SubClusterRegisterRequest request)
      throws FederationStateStoreInvalidInputException {

    // check if the request is present
    if (request == null) {
      String message = "Missing SubClusterRegister Request."
          + " Please try again by specifying a"
          + " SubCluster Register Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);

    }

    // validate subcluster info
    checkSubClusterInfo(request.getSubClusterInfo());
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast). Check if the provided {@link SubClusterDeregisterRequest} for
   * deregistration a subcluster is valid or not.
   *
   * @param request the {@link SubClusterDeregisterRequest} to validate against
   * @throws FederationStateStoreInvalidInputException if the request is invalid
   */
  public static void validate(SubClusterDeregisterRequest request)
      throws FederationStateStoreInvalidInputException {

    // check if the request is present
    if (request == null) {
      String message = "Missing SubClusterDeregister Request."
          + " Please try again by specifying a"
          + " SubCluster Deregister Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }

    // validate subcluster id
    checkSubClusterId(request.getSubClusterId());
    // validate subcluster state
    checkSubClusterState(request.getState());
    if (!request.getState().isFinal()) {
      String message = "Invalid non-final state: " + request.getState();
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast). Check if the provided {@link SubClusterHeartbeatRequest} for
   * heartbeating a subcluster is valid or not.
   *
   * @param request the {@link SubClusterHeartbeatRequest} to validate against
   * @throws FederationStateStoreInvalidInputException if the request is invalid
   */
  public static void validate(SubClusterHeartbeatRequest request)
      throws FederationStateStoreInvalidInputException {

    // check if the request is present
    if (request == null) {
      String message = "Missing SubClusterHeartbeat Request."
          + " Please try again by specifying a"
          + " SubCluster Heartbeat Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }

    // validate subcluster id
    checkSubClusterId(request.getSubClusterId());
    // validate last heartbeat timestamp
    checkTimestamp(request.getLastHeartBeat());
    // validate subcluster capability
    checkCapability(request.getCapability());
    // validate subcluster state
    checkSubClusterState(request.getState());

  }

  /**
   * Quick validation on the input to check some obvious fail conditions (fail
   * fast). Check if the provided {@link GetSubClusterInfoRequest} for querying
   * subcluster's information is valid or not.
   *
   * @param request the {@link GetSubClusterInfoRequest} to validate against
   * @throws FederationStateStoreInvalidInputException if the request is invalid
   */
  public static void validate(GetSubClusterInfoRequest request)
      throws FederationStateStoreInvalidInputException {

    // check if the request is present
    if (request == null) {
      String message = "Missing GetSubClusterInfo Request."
          + " Please try again by specifying a Get SubCluster information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }

    // validate subcluster id
    checkSubClusterId(request.getSubClusterId());
  }

  /**
   * Validate if all the required fields on {@link SubClusterInfo} are present
   * or not. {@code Capability} will be empty as the corresponding
   * {@code ResourceManager} is in the process of initialization during
   * registration.
   *
   * @param subClusterInfo the information of the subcluster to be verified
   * @throws FederationStateStoreInvalidInputException if the SubCluster Info
   *           are invalid
   */
  public static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
      throws FederationStateStoreInvalidInputException {
    if (subClusterInfo == null) {
      String message = "Missing SubCluster Information."
          + " Please try again by specifying SubCluster Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }

    // validate subcluster id
    checkSubClusterId(subClusterInfo.getSubClusterId());

    // validate AMRM Service address
    checkAddress(subClusterInfo.getAMRMServiceAddress());
    // validate ClientRM Service address
    checkAddress(subClusterInfo.getClientRMServiceAddress());
    // validate RMClient Service address
    checkAddress(subClusterInfo.getRMAdminServiceAddress());
    // validate RMWeb Service address
    checkAddress(subClusterInfo.getRMWebServiceAddress());

    // validate last heartbeat timestamp
    checkTimestamp(subClusterInfo.getLastHeartBeat());
    // validate last start timestamp
    checkTimestamp(subClusterInfo.getLastStartTime());

    // validate subcluster state
    checkSubClusterState(subClusterInfo.getState());

  }

  /**
   * Validate if the timestamp is positive or not.
   *
   * @param timestamp the timestamp to be verified
   * @throws FederationStateStoreInvalidInputException if the timestamp is
   *           invalid
   */
  private static void checkTimestamp(long timestamp)
      throws FederationStateStoreInvalidInputException {
    if (timestamp < 0) {
      String message = "Invalid timestamp information."
          + " Please try again by specifying valid Timestamp Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

  /**
   * Validate if the Capability is present or not.
   *
   * @param capability the capability of the subcluster to be verified
   * @throws FederationStateStoreInvalidInputException if the capability is
   *           invalid
   */
  private static void checkCapability(String capability)
      throws FederationStateStoreInvalidInputException {
    if (capability == null || capability.isEmpty()) {
      String message = "Invalid capability information."
          + " Please try again by specifying valid Capability Information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

  /**
   * Validate if the SubCluster Id is present or not.
   *
   * @param subClusterId the identifier of the subcluster to be verified
   * @throws FederationStateStoreInvalidInputException if the SubCluster Id is
   *           invalid
   */
  protected static void checkSubClusterId(SubClusterId subClusterId)
      throws FederationStateStoreInvalidInputException {
    // check if cluster id is present
    if (subClusterId == null) {
      String message = "Missing SubCluster Id information."
          + " Please try again by specifying Subcluster Id information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
    // check if cluster id is valid
    if (subClusterId.getId().isEmpty()) {
      String message = "Invalid SubCluster Id information."
          + " Please try again by specifying valid Subcluster Id.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

  /**
   * Validate if the SubCluster Address is a valid URL or not.
   *
   * @param address the endpoint of the subcluster to be verified
   * @throws FederationStateStoreInvalidInputException if the address is invalid
   */
  private static void checkAddress(String address)
      throws FederationStateStoreInvalidInputException {
    // Ensure url is not null
    if (address == null || address.isEmpty()) {
      String message = "Missing SubCluster Endpoint information."
          + " Please try again by specifying SubCluster Endpoint information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
    // Validate url is well formed
    boolean hasScheme = address.contains("://");
    URI uri = null;
    try {
      uri = hasScheme ? URI.create(address)
          : URI.create("dummyscheme://" + address);
    } catch (IllegalArgumentException e) {
      String message = "The provided SubCluster Endpoint does not contain a"
          + " valid host:port authority: " + address;
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
    String host = uri.getHost();
    int port = uri.getPort();
    String path = uri.getPath();
    if ((host == null) || (port < 0)
        || (!hasScheme && path != null && !path.isEmpty())) {
      String message = "The provided SubCluster Endpoint does not contain a"
          + " valid host:port authority: " + address;
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

  /**
   * Validate if the SubCluster State is present or not.
   *
   * @param state the state of the subcluster to be verified
   * @throws FederationStateStoreInvalidInputException if the SubCluster State
   *           is invalid
   */
  private static void checkSubClusterState(SubClusterState state)
      throws FederationStateStoreInvalidInputException {
    // check sub-cluster state is not empty
    if (state == null) {
      String message = "Missing SubCluster State information."
          + " Please try again by specifying SubCluster State information.";
      LOG.warn(message);
      throw new FederationStateStoreInvalidInputException(message);
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop FederationApplicationHomeSubClusterStoreInputValidator 源码

hadoop FederationPolicyStoreInputValidator 源码

hadoop FederationReservationHomeSubClusterStoreInputValidator 源码

hadoop FederationStateStoreUtils 源码

hadoop package-info 源码

0  赞