hadoop ZookeeperFederationStateStore 源码

  • 2022-10-20
  • 浏览 (168)

haddop ZookeeperFederationStateStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.federation.store.impl;

import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;

import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;

/**
 * ZooKeeper implementation of {@link FederationStateStore}.
 *
 * The znode structure is as follows:
 * ROOT_DIR_PATH
 * |--- MEMBERSHIP
 * |     |----- SC1
 * |     |----- SC2
 * |--- APPLICATION
 * |     |----- APP1
 * |     |----- APP2
 * |--- POLICY
 * |     |----- QUEUE1
 * |     |----- QUEUE1
 * |--- RESERVATION
 * |     |----- RESERVATION1
 * |     |----- RESERVATION2
 */
public class ZookeeperFederationStateStore implements FederationStateStore {

  private static final Logger LOG =
      LoggerFactory.getLogger(ZookeeperFederationStateStore.class);

  private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
  private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
  private final static String ROOT_ZNODE_NAME_POLICY = "policies";
  private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";

  /** Interface to Zookeeper. */
  private ZKCuratorManager zkManager;

  /** Directory to store the state store data. */
  private String baseZNode;

  private String appsZNode;
  private String membershipZNode;
  private String policiesZNode;
  private String reservationsZNode;
  private int maxAppsInStateStore;

  private volatile Clock clock = SystemClock.getInstance();

  @VisibleForTesting
  private ZKFederationStateStoreOpDurations opDurations =
      ZKFederationStateStoreOpDurations.getInstance();

  @Override
  public void init(Configuration conf) throws YarnException {
    LOG.info("Initializing ZooKeeper connection");

    maxAppsInStateStore = conf.getInt(
       YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
       YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);

    baseZNode = conf.get(
        YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
    try {
      this.zkManager = new ZKCuratorManager(conf);
      this.zkManager.start();
    } catch (IOException e) {
      LOG.error("Cannot initialize the ZK connection", e);
    }

    // Base znodes
    membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
    appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
    policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
    reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);

    // Create base znode for each entity
    try {
      List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
      zkManager.createRootDirRecursively(membershipZNode, zkAcl);
      zkManager.createRootDirRecursively(appsZNode, zkAcl);
      zkManager.createRootDirRecursively(policiesZNode, zkAcl);
      zkManager.createRootDirRecursively(reservationsZNode, zkAcl);
    } catch (Exception e) {
      String errMsg = "Cannot create base directories: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
  }

  @Override
  public void close() throws Exception {
    if (zkManager != null) {
      zkManager.close();
    }
  }

  @Override
  public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
      AddApplicationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
    ApplicationId appId = app.getApplicationId();

    // Try to write the subcluster
    SubClusterId homeSubCluster = app.getHomeSubCluster();
    try {
      putApp(appId, homeSubCluster, false);
    } catch (Exception e) {
      String errMsg = "Cannot add application home subcluster for " + appId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Check for the actual subcluster
    try {
      homeSubCluster = getApp(appId);
    } catch (Exception e) {
      String errMsg = "Cannot check app home subcluster for " + appId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addAppHomeSubClusterDuration(start, end);
    return AddApplicationHomeSubClusterResponse
        .newInstance(homeSubCluster);
  }

  @Override
  public UpdateApplicationHomeSubClusterResponse
      updateApplicationHomeSubCluster(
          UpdateApplicationHomeSubClusterRequest request)
              throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
    ApplicationId appId = app.getApplicationId();
    SubClusterId homeSubCluster = getApp(appId);
    if (homeSubCluster == null) {
      String errMsg = "Application " + appId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    SubClusterId newSubClusterId =
        request.getApplicationHomeSubCluster().getHomeSubCluster();
    putApp(appId, newSubClusterId, true);

    long end = clock.getTime();
    opDurations.addUpdateAppHomeSubClusterDuration(start, end);
    return UpdateApplicationHomeSubClusterResponse.newInstance();
  }

  @Override
  public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
      GetApplicationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationId appId = request.getApplicationId();
    SubClusterId homeSubCluster = getApp(appId);
    if (homeSubCluster == null) {
      String errMsg = "Application " + appId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetAppHomeSubClusterDuration(start, end);
    return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster);
  }

  @Override
  public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
      GetApplicationsHomeSubClusterRequest request) throws YarnException {

    if (request == null) {
      throw new YarnException("Missing getApplicationsHomeSubCluster request");
    }

    try {
      long start = clock.getTime();
      SubClusterId requestSC = request.getSubClusterId();
      List<String> children = zkManager.getChildren(appsZNode);
      List<ApplicationHomeSubCluster> result = children.stream()
          .map(child -> generateAppHomeSC(child))
          .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
          .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
          .limit(maxAppsInStateStore)
          .collect(Collectors.toList());
      long end = clock.getTime();
      opDurations.addGetAppsHomeSubClusterDuration(start, end);
      LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
      return GetApplicationsHomeSubClusterResponse.newInstance(result);
    } catch (Exception e) {
      String errMsg = "Cannot get apps: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    throw new YarnException("Cannot get app by request");
  }

  private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
    try {
      ApplicationId applicationId = ApplicationId.fromString(appId);
      SubClusterId homeSubCluster = getApp(applicationId);
      ApplicationHomeSubCluster app =
          ApplicationHomeSubCluster.newInstance(applicationId, homeSubCluster);
      return app;
    } catch (Exception ex) {
      LOG.error("get homeSubCluster by appId = {}.", appId);
    }
    return null;
  }

  @Override
  public DeleteApplicationHomeSubClusterResponse
      deleteApplicationHomeSubCluster(
          DeleteApplicationHomeSubClusterRequest request)
              throws YarnException {
    long start = clock.getTime();
    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
    ApplicationId appId = request.getApplicationId();
    String appZNode = getNodePath(appsZNode, appId.toString());

    boolean exists = false;
    try {
      exists = zkManager.exists(appZNode);
    } catch (Exception e) {
      String errMsg = "Cannot check app: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    if (!exists) {
      String errMsg = "Application " + appId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    try {
      zkManager.delete(appZNode);
    } catch (Exception e) {
      String errMsg = "Cannot delete app: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addDeleteAppHomeSubClusterDuration(start, end);
    return DeleteApplicationHomeSubClusterResponse.newInstance();
  }

  @Override
  public SubClusterRegisterResponse registerSubCluster(
      SubClusterRegisterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterInfo subClusterInfo = request.getSubClusterInfo();
    SubClusterId subclusterId = subClusterInfo.getSubClusterId();

    // Update the heartbeat time
    long currentTime = getCurrentTime();
    subClusterInfo.setLastHeartBeat(currentTime);

    try {
      putSubclusterInfo(subclusterId, subClusterInfo, true);
    } catch (Exception e) {
      String errMsg = "Cannot register subcluster: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addRegisterSubClusterDuration(start, end);
    return SubClusterRegisterResponse.newInstance();
  }

  @Override
  public SubClusterDeregisterResponse deregisterSubCluster(
      SubClusterDeregisterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();
    SubClusterState state = request.getState();

    // Get the current information and update it
    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
    if (subClusterInfo == null) {
      String errMsg = "SubCluster " + subClusterId + " not found";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    } else {
      subClusterInfo.setState(state);
      putSubclusterInfo(subClusterId, subClusterInfo, true);
    }
    long end = clock.getTime();
    opDurations.addDeregisterSubClusterDuration(start, end);
    return SubClusterDeregisterResponse.newInstance();
  }

  @Override
  public SubClusterHeartbeatResponse subClusterHeartbeat(
      SubClusterHeartbeatRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();

    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
    if (subClusterInfo == null) {
      String errMsg = "SubCluster " + subClusterId
          + " does not exist; cannot heartbeat";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    long currentTime = getCurrentTime();
    subClusterInfo.setLastHeartBeat(currentTime);
    subClusterInfo.setState(request.getState());
    subClusterInfo.setCapability(request.getCapability());

    putSubclusterInfo(subClusterId, subClusterInfo, true);
    long end = clock.getTime();
    opDurations.addSubClusterHeartbeatDuration(start, end);
    return SubClusterHeartbeatResponse.newInstance();
  }

  @Override
  public GetSubClusterInfoResponse getSubCluster(
      GetSubClusterInfoRequest request) throws YarnException {
    long start = clock.getTime();
    FederationMembershipStateStoreInputValidator.validate(request);
    SubClusterId subClusterId = request.getSubClusterId();
    SubClusterInfo subClusterInfo = null;
    try {
      subClusterInfo = getSubclusterInfo(subClusterId);
      if (subClusterInfo == null) {
        LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
        return null;
      }
    } catch (Exception e) {
      String errMsg = "Cannot get subcluster: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetSubClusterDuration(start, end);
    return GetSubClusterInfoResponse.newInstance(subClusterInfo);
  }

  @Override
  public GetSubClustersInfoResponse getSubClusters(
      GetSubClustersInfoRequest request) throws YarnException {
    long start = clock.getTime();
    List<SubClusterInfo> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(membershipZNode)) {
        SubClusterId subClusterId = SubClusterId.newInstance(child);
        SubClusterInfo info = getSubclusterInfo(subClusterId);
        if (!request.getFilterInactiveSubClusters() ||
            info.getState().isActive()) {
          result.add(info);
        }
      }
    } catch (Exception e) {
      String errMsg = "Cannot get subclusters: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetSubClustersDuration(start, end);
    return GetSubClustersInfoResponse.newInstance(result);
  }


  @Override
  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
    long start = clock.getTime();
    FederationPolicyStoreInputValidator.validate(request);
    String queue = request.getQueue();
    SubClusterPolicyConfiguration policy = null;
    try {
      policy = getPolicy(queue);
    } catch (Exception e) {
      String errMsg = "Cannot get policy: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    if (policy == null) {
      LOG.warn("Policy for queue: {} does not exist.", queue);
      return null;
    }
    long end = clock.getTime();
    opDurations.addGetPolicyConfigurationDuration(start, end);
    return GetSubClusterPolicyConfigurationResponse
        .newInstance(policy);
  }

  @Override
  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
    long start = clock.getTime();
    FederationPolicyStoreInputValidator.validate(request);
    SubClusterPolicyConfiguration policy =
        request.getPolicyConfiguration();
    try {
      String queue = policy.getQueue();
      putPolicy(queue, policy, true);
    } catch (Exception e) {
      String errMsg = "Cannot set policy: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addSetPolicyConfigurationDuration(start, end);
    return SetSubClusterPolicyConfigurationResponse.newInstance();
  }

  @Override
  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
    long start = clock.getTime();
    List<SubClusterPolicyConfiguration> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(policiesZNode)) {
        SubClusterPolicyConfiguration policy = getPolicy(child);
        if (policy == null) {
          LOG.warn("Policy for queue: {} does not exist.", child);
          continue;
        }
        result.add(policy);
      }
    } catch (Exception e) {
      String errMsg = "Cannot get policies: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetPoliciesConfigurationsDuration(start, end);
    return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
  }

  @Override
  public Version getCurrentVersion() {
    return null;
  }

  @Override
  public Version loadVersion() {
    return null;
  }

  /**
   * Get the subcluster for an application.
   * @param appId Application identifier.
   * @return Subcluster identifier.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private SubClusterId getApp(final ApplicationId appId) throws YarnException {
    String appZNode = getNodePath(appsZNode, appId.toString());

    SubClusterId subClusterId = null;
    byte[] data = get(appZNode);
    if (data != null) {
      try {
        subClusterId = new SubClusterIdPBImpl(
            SubClusterIdProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse application at " + appZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return subClusterId;
  }

  /**
   * Put an application.
   * @param appId Application identifier.
   * @param subClusterId Subcluster identifier.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private void putApp(final ApplicationId appId,
      final SubClusterId subClusterId, boolean update)
          throws YarnException {
    String appZNode = getNodePath(appsZNode, appId.toString());
    SubClusterIdProto proto =
        ((SubClusterIdPBImpl)subClusterId).getProto();
    byte[] data = proto.toByteArray();
    put(appZNode, data, update);
  }

  /**
   * Get the current information for a subcluster from Zookeeper.
   * @param subclusterId Subcluster identifier.
   * @return Subcluster information or null if it doesn't exist.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
      throws YarnException {
    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());

    SubClusterInfo policy = null;
    byte[] data = get(memberZNode);
    if (data != null) {
      try {
        policy = new SubClusterInfoPBImpl(
            SubClusterInfoProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse subcluster info at " + memberZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return policy;
  }

  /**
   * Put the subcluster information in Zookeeper.
   * @param subclusterId Subcluster identifier.
   * @param subClusterInfo Subcluster information.
   * @throws Exception If it cannot contact ZooKeeper.
   */
  private void putSubclusterInfo(final SubClusterId subclusterId,
      final SubClusterInfo subClusterInfo, final boolean update)
          throws YarnException {
    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
    SubClusterInfoProto proto =
        ((SubClusterInfoPBImpl)subClusterInfo).getProto();
    byte[] data = proto.toByteArray();
    put(memberZNode, data, update);
  }

  /**
   * Get the queue policy from Zookeeper.
   * @param queue Name of the queue.
   * @return Subcluster policy configuration.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private SubClusterPolicyConfiguration getPolicy(final String queue)
      throws YarnException {
    String policyZNode = getNodePath(policiesZNode, queue);

    SubClusterPolicyConfiguration policy = null;
    byte[] data = get(policyZNode);
    if (data != null) {
      try {
        policy = new SubClusterPolicyConfigurationPBImpl(
            SubClusterPolicyConfigurationProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse policy at " + policyZNode;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return policy;
  }

  /**
   * Put the subcluster information in Zookeeper.
   * @param queue Name of the queue.
   * @param policy Subcluster policy configuration.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private void putPolicy(final String queue,
      final SubClusterPolicyConfiguration policy, boolean update)
          throws YarnException {
    String policyZNode = getNodePath(policiesZNode, queue);

    SubClusterPolicyConfigurationProto proto =
        ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
    byte[] data = proto.toByteArray();
    put(policyZNode, data, update);
  }

  /**
   * Get data from a znode in Zookeeper.
   * @param znode Path of the znode.
   * @return Data in the znode.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private byte[] get(String znode) throws YarnException {
    boolean exists = false;
    try {
      exists = zkManager.exists(znode);
    } catch (Exception e) {
      String errMsg = "Cannot find znode " + znode;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    if (!exists) {
      LOG.error("{} does not exist", znode);
      return null;
    }

    byte[] data = null;
    try {
      data = zkManager.getData(znode);
    } catch (Exception e) {
      String errMsg = "Cannot get data from znode " + znode
          + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    return data;
  }

  /**
   * Put data into a znode in Zookeeper.
   * @param znode Path of the znode.
   * @param data Data to write.
   * @throws YarnException If it cannot contact ZooKeeper.
   */
  private void put(String znode, byte[] data, boolean update)
      throws YarnException {
    // Create the znode
    boolean created = false;
    try {
      created = zkManager.create(znode);
    } catch (Exception e) {
      String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    if (!created) {
      LOG.debug("{} not created", znode);
      if (!update) {
        LOG.info("{} already existed and we are not updating", znode);
        return;
      }
    }

    // Write the data into the znode
    try {
      zkManager.setData(znode, data, -1);
    } catch (Exception e) {
      String errMsg = "Cannot write data into znode " + znode
          + ": " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
  }

  /**
   * Get the current time.
   * @return Current time in milliseconds.
   */
  private static long getCurrentTime() {
    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    return cal.getTimeInMillis();
  }

  private void putReservation(final ReservationId reservationId,
      final SubClusterId subClusterId, boolean update) throws YarnException {
    String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());
    SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
    byte[] data = proto.toByteArray();
    put(reservationZNode, data, update);
  }

  private SubClusterId getReservation(final ReservationId reservationId)
      throws YarnException {
    String reservationIdZNode = getNodePath(reservationsZNode, reservationId.toString());
    SubClusterId subClusterId = null;
    byte[] data = get(reservationIdZNode);
    if (data != null) {
      try {
        subClusterId = new SubClusterIdPBImpl(SubClusterIdProto.parseFrom(data));
      } catch (InvalidProtocolBufferException e) {
        String errMsg = "Cannot parse reservation at " + reservationId;
        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
      }
    }
    return subClusterId;
  }

  @VisibleForTesting
  public ZKFederationStateStoreOpDurations getOpDurations() {
    return opDurations;
  }

  @Override
  public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
      AddReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
    ReservationId reservationId = reservationHomeSubCluster.getReservationId();

    // Try to write the subcluster
    SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
    try {
      putReservation(reservationId, homeSubCluster, false);
    } catch (Exception e) {
      String errMsg = "Cannot add reservation home subcluster for " + reservationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    // Check for the actual subcluster
    try {
      homeSubCluster = getReservation(reservationId);
    } catch (Exception e) {
      String errMsg = "Cannot check app home subcluster for " + reservationId;
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addReservationHomeSubClusterDuration(start, end);
    return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
  }

  @Override
  public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
      GetReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationId reservationId = request.getReservationId();
    SubClusterId homeSubCluster = getReservation(reservationId);

    if (homeSubCluster == null) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
    long end = clock.getTime();
    opDurations.addGetReservationHomeSubClusterDuration(start, end);
    return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
  }

  @Override
  public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
      GetReservationsHomeSubClusterRequest request) throws YarnException {
    long start = clock.getTime();
    List<ReservationHomeSubCluster> result = new ArrayList<>();

    try {
      for (String child : zkManager.getChildren(reservationsZNode)) {
        ReservationId reservationId = ReservationId.parseReservationId(child);
        SubClusterId homeSubCluster = getReservation(reservationId);
        ReservationHomeSubCluster app =
            ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
        result.add(app);
      }
    } catch (Exception e) {
      String errMsg = "Cannot get apps: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addGetReservationsHomeSubClusterDuration(start, end);
    return GetReservationsHomeSubClusterResponse.newInstance(result);
  }

  @Override
  public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
      DeleteReservationHomeSubClusterRequest request) throws YarnException {
    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationId reservationId = request.getReservationId();
    String reservationZNode = getNodePath(reservationsZNode, reservationId.toString());

    boolean exists = false;
    try {
      exists = zkManager.exists(reservationZNode);
    } catch (Exception e) {
      String errMsg = "Cannot check reservation: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    if (!exists) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    try {
      zkManager.delete(reservationZNode);
    } catch (Exception e) {
      String errMsg = "Cannot delete reservation: " + e.getMessage();
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }
    long end = clock.getTime();
    opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
    return DeleteReservationHomeSubClusterResponse.newInstance();
  }

  @Override
  public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
      UpdateReservationHomeSubClusterRequest request) throws YarnException {

    long start = clock.getTime();
    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
    ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
    SubClusterId homeSubCluster = getReservation(reservationId);

    if (homeSubCluster == null) {
      String errMsg = "Reservation " + reservationId + " does not exist";
      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
    }

    SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
    putReservation(reservationId, newSubClusterId, true);
    long end = clock.getTime();
    opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
    return UpdateReservationHomeSubClusterResponse.newInstance();
  }

  @Override
  public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }

  @Override
  public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
      throws YarnException, IOException {
    throw new NotImplementedException("Code is not implemented");
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop MemoryFederationStateStore 源码

hadoop SQLFederationStateStore 源码

hadoop ZKFederationStateStoreOpDurations 源码

hadoop package-info 源码

0  赞