hadoop FileSystemRMStateStore 源码

  • 2022-10-20
  • 浏览 (218)

haddop FileSystemRMStateStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;

import org.apache.hadoop.classification.VisibleForTesting;

@Private
@Unstable
/**
 * A simple class for storing RM state in any storage that implements a basic
 * FileSystem interface. Does not use directories so that simple key-value
 * stores can be used. The retry policy for the real filesystem client must be
 * configured separately to enable retry of filesystem operations when needed.
 *
 * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved
 * separately. The currentMasterkey and nextMasterkey have been stored.
 * Also, AMRMToken has been removed from ApplicationAttemptState.
 *
 * Changes from 1.2 to 1.3, Addition of ReservationSystem state.
 */
public class FileSystemRMStateStore extends RMStateStore {

  public static final Logger LOG =
      LoggerFactory.getLogger(FileSystemRMStateStore.class);

  protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
  protected static final Version CURRENT_VERSION_INFO = Version
    .newInstance(1, 3);
  protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
      "AMRMTokenSecretManagerNode";

  private static final String UNREADABLE_BY_SUPERUSER_XATTRIB =
          "security.hdfs.unreadable.by.superuser";
  protected FileSystem fs;
  @VisibleForTesting
  protected Configuration fsConf;

  private Path rootDirPath;
  @Private
  @VisibleForTesting
  Path rmDTSecretManagerRoot;
  private Path rmAppRoot;
  private Path dtSequenceNumberPath = null;
  private int fsNumRetries;
  private long fsRetryInterval;
  private boolean intermediateEncryptionEnabled =
      YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION;

  @VisibleForTesting
  Path fsWorkingPath;

  Path amrmTokenSecretManagerRoot;
  private Path reservationRoot;
  private Path proxyCARoot;

  @Override
  public synchronized void initInternal(Configuration conf)
      throws Exception{
    fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
    rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
    rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
    amrmTokenSecretManagerRoot =
        new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
    reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT);
    proxyCARoot = new Path(rootDirPath, PROXY_CA_ROOT);
    fsNumRetries =
        conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
            YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
    fsRetryInterval =
        conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
                YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
    intermediateEncryptionEnabled =
        conf.getBoolean(YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION,
          YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION);
  }

  @Override
  protected synchronized void startInternal() throws Exception {
    // create filesystem only now, as part of service-start. By this time, RM is
    // authenticated with kerberos so we are good to create a file-system
    // handle.
    fsConf = new Configuration(getConfig());

    String scheme = fsWorkingPath.toUri().getScheme();
    if (scheme == null) {
      scheme = FileSystem.getDefaultUri(fsConf).getScheme();
    }
    if (scheme != null) {
      String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
      fsConf.setBoolean(disableCacheName, true);
    }

    fs = fsWorkingPath.getFileSystem(fsConf);
    mkdirsWithRetries(rmDTSecretManagerRoot);
    mkdirsWithRetries(rmAppRoot);
    mkdirsWithRetries(amrmTokenSecretManagerRoot);
    mkdirsWithRetries(reservationRoot);
    mkdirsWithRetries(proxyCARoot);
  }

  @Override
  protected synchronized void closeInternal() throws Exception {
    closeWithRetries();
  }

  @Override
  protected Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  @Override
  protected synchronized Version loadVersion() throws Exception {
    Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
    FileStatus status = getFileStatusWithRetries(versionNodePath);
    if (status != null) {
      byte[] data = readFileWithRetries(versionNodePath, status.getLen());
      Version version =
          new VersionPBImpl(VersionProto.parseFrom(data));
      return version;
    }
    return null;
  }

  @Override
  protected synchronized void storeVersion() throws Exception {
    Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
    byte[] data =
        ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
    if (existsWithRetries(versionNodePath)) {
      updateFile(versionNodePath, data, false);
    } else {
      writeFileWithRetries(versionNodePath, data, false);
    }
  }
  
  @Override
  public synchronized long getAndIncrementEpoch() throws Exception {
    Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
    long currentEpoch = baseEpoch;
    FileStatus status = getFileStatusWithRetries(epochNodePath);
    if (status != null) {
      // load current epoch
      byte[] data = readFileWithRetries(epochNodePath, status.getLen());
      Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
      currentEpoch = epoch.getEpoch();
      // increment epoch and store it
      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
          .toByteArray();
      updateFile(epochNodePath, storeData, false);
    } else {
      // initialize epoch file with 1 for the next time.
      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
          .toByteArray();
      writeFileWithRetries(epochNodePath, storeData, false);
    }
    return currentEpoch;
  }
  
  @Override
  public synchronized RMState loadState() throws Exception {
    RMState rmState = new RMState();
    // recover DelegationTokenSecretManager
    loadRMDTSecretManagerState(rmState);
    // recover RM applications
    loadRMAppState(rmState);
    // recover AMRMTokenSecretManager
    loadAMRMTokenSecretManagerState(rmState);
    // recover reservation state
    loadReservationSystemState(rmState);
    // recover ProxyCAManager state
    loadProxyCAManagerState(rmState);
    return rmState;
  }

  private void loadReservationSystemState(RMState rmState) throws Exception {
    try {
      final ReservationStateFileProcessor fileProcessor = new
          ReservationStateFileProcessor(rmState);
      final Path rootDirectory = this.reservationRoot;

      processDirectoriesOfFiles(fileProcessor, rootDirectory);
    } catch (Exception e) {
      LOG.error("Failed to load state.", e);
      throw e;
    }
  }

  private void loadAMRMTokenSecretManagerState(RMState rmState)
      throws Exception {
    checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
    Path amrmTokenSecretManagerStateDataDir =
        new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
    FileStatus status = getFileStatusWithRetries(
        amrmTokenSecretManagerStateDataDir);
    if (status == null) {
      return;
    }
    assert status.isFile();
    byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir,
            status.getLen());
    AMRMTokenSecretManagerStatePBImpl stateData =
        new AMRMTokenSecretManagerStatePBImpl(
          AMRMTokenSecretManagerStateProto.parseFrom(data));
    rmState.amrmTokenSecretManagerState =
        AMRMTokenSecretManagerState.newInstance(
          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
  }

  private void loadRMAppState(RMState rmState) throws Exception {
    try {
      List<ApplicationAttemptStateData> attempts = new ArrayList<>();
      final RMAppStateFileProcessor rmAppStateFileProcessor =
          new RMAppStateFileProcessor(rmState, attempts);
      final Path rootDirectory = this.rmAppRoot;

      processDirectoriesOfFiles(rmAppStateFileProcessor, rootDirectory);

      // go through all attempts and add them to their apps, Ideally, each
      // attempt node must have a corresponding app node, because remove
      // directory operation remove both at the same time
      for (ApplicationAttemptStateData attemptState : attempts) {
        ApplicationId appId = attemptState.getAttemptId().getApplicationId();
        ApplicationStateData appState = rmState.appState.get(appId);
        assert appState != null;
        appState.attempts.put(attemptState.getAttemptId(), attemptState);
      }
      LOG.info("Done loading applications from FS state store");
    } catch (Exception e) {
      LOG.error("Failed to load state.", e);
      throw e;
    }
  }

  private void processDirectoriesOfFiles(
      RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory)
    throws Exception {
    for (FileStatus dir : listStatusWithRetries(rootDirectory)) {
      checkAndResumeUpdateOperation(dir.getPath());
      String dirName = dir.getPath().getName();
      for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) {
        assert fileNodeStatus.isFile();
        String fileName = fileNodeStatus.getPath().getName();
        if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) {
          continue;
        }
        byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(),
                fileNodeStatus.getLen());
        // Set attribute if not already set
        setUnreadableBySuperuserXattrib(fileNodeStatus.getPath());

        rmAppStateFileProcessor.processChildNode(dirName, fileName,
            fileData);
      }
    }
  }

  private boolean checkAndRemovePartialRecord(Path record) throws IOException {
    // If the file ends with .tmp then it shows that it failed
    // during saving state into state store. The file will be deleted as a
    // part of this call
    if (record.getName().endsWith(".tmp")) {
      LOG.error("incomplete rm state store entry found :"
          + record);
      fs.delete(record, false);
      return true;
    }
    return false;
  }

  private void checkAndResumeUpdateOperation(Path path) throws Exception {
    // Before loading the state information, check whether .new file exists.
    // If it does, the prior updateFile is failed on half way. We need to
    // complete replacing the old file first.
    FileStatus[] newChildNodes =
        listStatusWithRetries(path, new PathFilter() {
      @Override
      public boolean accept(Path path) {
        return path.getName().endsWith(".new");
      }
    });
    for(FileStatus newChildNodeStatus : newChildNodes) {
      assert newChildNodeStatus.isFile();
      String newChildNodeName = newChildNodeStatus.getPath().getName();
      String childNodeName = newChildNodeName.substring(
              0, newChildNodeName.length() - ".new".length());
      Path childNodePath =
          new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
      replaceFile(newChildNodeStatus.getPath(), childNodePath);
    }
  }
  private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
    checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
    FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot);

    for(FileStatus childNodeStatus : childNodes) {
      assert childNodeStatus.isFile();
      String childNodeName = childNodeStatus.getPath().getName();
      if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) {
        continue;
      }
      if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
        rmState.rmSecretManagerState.dtSequenceNumber =
            Integer.parseInt(childNodeName.split("_")[1]);
        continue;
      }

      Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
      byte[] childData = readFileWithRetries(childNodePath,
          childNodeStatus.getLen());
      ByteArrayInputStream is = new ByteArrayInputStream(childData);
      try (DataInputStream fsIn = new DataInputStream(is)) {
        if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
          DelegationKey key = new DelegationKey();
          key.readFields(fsIn);
          rmState.rmSecretManagerState.masterKeyState.add(key);
          LOG.debug("Loaded delegation key: keyId={}, expirationDate={}",
              key.getKeyId(), key.getExpiryDate());
        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
          RMDelegationTokenIdentifierData identifierData =
              RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn);
          RMDelegationTokenIdentifier identifier =
              identifierData.getTokenIdentifier();
          long renewDate = identifierData.getRenewDate();
          rmState.rmSecretManagerState.delegationTokenState.put(identifier,
            renewDate);
          LOG.debug("Loaded RMDelegationTokenIdentifier: {} renewDate={}",
              identifier, renewDate);
        } else {
          LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
        }
      }
    }
  }

  private void loadProxyCAManagerState(RMState rmState) throws Exception {
    checkAndResumeUpdateOperation(proxyCARoot);

    Path caCertPath = getNodePath(proxyCARoot, PROXY_CA_CERT_NODE);
    Path caPrivateKeyPath = getNodePath(proxyCARoot, PROXY_CA_PRIVATE_KEY_NODE);

    if (!existsWithRetries(caCertPath)
        || !existsWithRetries(caPrivateKeyPath)) {
      LOG.warn("Couldn't find Proxy CA data");
      return;
    }

    FileStatus caCertFileStatus = getFileStatus(caCertPath);
    byte[] caCertData = readFileWithRetries(caCertPath,
        caCertFileStatus.getLen());

    FileStatus caPrivateKeyFileStatus = getFileStatus(caPrivateKeyPath);
    byte[] caPrivateKeyData = readFileWithRetries(caPrivateKeyPath,
        caPrivateKeyFileStatus.getLen());

    rmState.getProxyCAState().setCaCert(caCertData);
    rmState.getProxyCAState().setCaPrivateKey(caPrivateKeyData);
  }

  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    Path appDirPath = getAppDir(rmAppRoot, appId);
    mkdirsWithRetries(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFileWithRetries(nodeCreatePath, appStateData, true);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }

  @Override
  public synchronized void updateApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    Path appDirPath = getAppDir(rmAppRoot, appId);
    Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

    LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      updateFile(nodeCreatePath, appStateData, true);
    } catch (Exception e) {
      LOG.info("Error updating info for app: " + appId, e);
      throw e;
    }
  }

  @Override
  public synchronized void storeApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    Path appDirPath =
        getAppDir(rmAppRoot, appAttemptId.getApplicationId());
    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
    LOG.info("Storing info for attempt: " + appAttemptId + " at: "
        + nodeCreatePath);
    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFileWithRetries(nodeCreatePath, attemptStateData, true);
    } catch (Exception e) {
      LOG.info("Error storing info for attempt: " + appAttemptId, e);
      throw e;
    }
  }

  @Override
  public synchronized void updateApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    Path appDirPath =
        getAppDir(rmAppRoot, appAttemptId.getApplicationId());
    Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
    LOG.info("Updating info for attempt: " + appAttemptId + " at: "
        + nodeCreatePath);
    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      updateFile(nodeCreatePath, attemptStateData, true);
    } catch (Exception e) {
      LOG.info("Error updating info for attempt: " + appAttemptId, e);
      throw e;
    }
  }

  @Override
  public synchronized void removeApplicationAttemptInternal(
      ApplicationAttemptId appAttemptId)
      throws Exception {
    Path appDirPath =
        getAppDir(rmAppRoot, appAttemptId.getApplicationId());
    Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString());
    LOG.info("Removing info for attempt: " + appAttemptId + " at: "
        + nodeRemovePath);
    deleteFileWithRetries(nodeRemovePath);
  }

  @Override
  public synchronized void removeApplicationStateInternal(
      ApplicationStateData appState)
      throws Exception {
    ApplicationId appId =
        appState.getApplicationSubmissionContext().getApplicationId();
    Path nodeRemovePath = getAppDir(rmAppRoot, appId);
    LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
    deleteFileWithRetries(nodeRemovePath);
  }

  @Override
  public synchronized void storeRMDelegationTokenState(
      RMDelegationTokenIdentifier identifier, Long renewDate)
      throws Exception {
    storeOrUpdateRMDelegationTokenState(identifier, renewDate, false);
  }

  @Override
  public synchronized void removeRMDelegationTokenState(
      RMDelegationTokenIdentifier identifier) throws Exception {
    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
            DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
    LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
    deleteFileWithRetries(nodeCreatePath);
  }

  @Override
  protected synchronized void updateRMDelegationTokenState(
      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
      throws Exception {
    storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
  }

  private void storeOrUpdateRMDelegationTokenState(
      RMDelegationTokenIdentifier identifier, Long renewDate,
      boolean isUpdate) throws Exception {
    Path nodeCreatePath =
        getNodePath(rmDTSecretManagerRoot,
          DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
    RMDelegationTokenIdentifierData identifierData =
        new RMDelegationTokenIdentifierData(identifier, renewDate);
    if (isUpdate) {
      LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
      updateFile(nodeCreatePath, identifierData.toByteArray(), true);
    } else {
      LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);

      // store sequence number
      Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
            + identifier.getSequenceNumber());
      LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
          + identifier.getSequenceNumber());
      if (dtSequenceNumberPath == null) {
        if (!createFileWithRetries(latestSequenceNumberPath)) {
          throw new Exception("Failed to create " + latestSequenceNumberPath);
        }
      } else {
        if (!renameFileWithRetries(dtSequenceNumberPath,
            latestSequenceNumberPath)) {
          throw new Exception("Failed to rename " + dtSequenceNumberPath);
        }
      }
      dtSequenceNumberPath = latestSequenceNumberPath;
    }
  }

  @Override
  public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
      throws Exception {
    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
          DELEGATION_KEY_PREFIX + masterKey.getKeyId());
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    try (DataOutputStream fsOut = new DataOutputStream(os)) {
      LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
      masterKey.write(fsOut);
      writeFileWithRetries(nodeCreatePath, os.toByteArray(), true);
    }
  }

  @Override
  public synchronized void
      removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
    Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
          DELEGATION_KEY_PREFIX + masterKey.getKeyId());
    LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
    deleteFileWithRetries(nodeCreatePath);
  }

  @Override
  public synchronized void deleteStore() throws Exception {
    if (existsWithRetries(rootDirPath)) {
      deleteFileWithRetries(rootDirPath);
    }
  }

  @Override
  public synchronized void removeApplication(ApplicationId removeAppId)
      throws Exception {
    Path nodeRemovePath = getAppDir(rmAppRoot, removeAppId);
    if (existsWithRetries(nodeRemovePath)) {
      deleteFileWithRetries(nodeRemovePath);
    }
  }

  @Override
  synchronized protected void storeProxyCACertState(
      X509Certificate caCert, PrivateKey caPrivateKey) throws Exception {
    byte[] caCertData = caCert.getEncoded();
    byte[] caPrivateKeyData = caPrivateKey.getEncoded();

    Path caCertPath = getNodePath(proxyCARoot, PROXY_CA_CERT_NODE);
    Path caPrivateKeyPath = getNodePath(proxyCARoot, PROXY_CA_PRIVATE_KEY_NODE);

    if (existsWithRetries(caCertPath)) {
      updateFile(caCertPath, caCertData, true);
    } else {
      writeFileWithRetries(caCertPath, caCertData, true);
    }

    if (existsWithRetries(caPrivateKeyPath)) {
      updateFile(caPrivateKeyPath, caPrivateKeyData, true);
    } else {
      writeFileWithRetries(caPrivateKeyPath, caPrivateKeyData, true);
    }
  }

  private Path getAppDir(Path root, ApplicationId appId) {
    return getNodePath(root, appId.toString());
  }

  @VisibleForTesting
  protected Path getAppDir(ApplicationId appId) {
    return getAppDir(rmAppRoot, appId);
  }

  @VisibleForTesting
  protected Path getAppAttemptDir(ApplicationAttemptId appAttId) {
    return getNodePath(getAppDir(appAttId.getApplicationId()), appAttId
            .toString());
  }
  // FileSystem related code

  private boolean checkAndRemovePartialRecordWithRetries(final Path record)
      throws Exception {
    return new FSAction<Boolean>() {
      @Override
      public Boolean run() throws Exception {
        return checkAndRemovePartialRecord(record);
      }
    }.runWithRetries();
  }

  private void mkdirsWithRetries(final Path appDirPath) throws Exception {
    new FSAction<Void>() {
      @Override
      public Void run() throws Exception {
        fs.mkdirs(appDirPath);
        return null;
      }
    }.runWithRetries();
  }

  private void writeFileWithRetries(final Path outputPath, final byte[] data,
                                    final boolean makeUnreadableByAdmin)
          throws Exception {
    new FSAction<Void>() {
      @Override
      public Void run() throws Exception {
        writeFile(outputPath, data, makeUnreadableByAdmin);
        return null;
      }
    }.runWithRetries();
  }

  private void deleteFileWithRetries(final Path deletePath) throws Exception {
    new FSAction<Void>() {
      @Override
      public Void run() throws Exception {
        deleteFile(deletePath);
        return null;
      }
    }.runWithRetries();
  }

  private boolean renameFileWithRetries(final Path src, final Path dst)
      throws Exception {
    return new FSAction<Boolean>() {
      @Override
      public Boolean run() throws Exception {
        return renameFile(src, dst);
      }
    }.runWithRetries();
  }

  private boolean createFileWithRetries(final Path newFile) throws Exception {
    return new FSAction<Boolean>() {
      @Override
      public Boolean run() throws Exception {
        return createFile(newFile);
      }
    }.runWithRetries();
  }

  private FileStatus getFileStatusWithRetries(final Path path)
      throws Exception {
    return new FSAction<FileStatus>() {
      @Override
      public FileStatus run() throws Exception {
        return getFileStatus(path);
      }
    }.runWithRetries();
  }

  private boolean existsWithRetries(final Path path) throws Exception {
    return new FSAction<Boolean>() {
      @Override
      public Boolean run() throws Exception {
        return fs.exists(path);
      }
    }.runWithRetries();
  }

  private byte[] readFileWithRetries(final Path inputPath, final long len)
      throws Exception {
    return new FSAction<byte[]>() {
      @Override
      public byte[] run() throws Exception {
        return readFile(inputPath, len);
      }
    }.runWithRetries();
  }

  private FileStatus[] listStatusWithRetries(final Path path)
      throws Exception {
    return new FSAction<FileStatus[]>() {
      @Override
      public FileStatus[] run() throws Exception {
        return fs.listStatus(path);
      }
    }.runWithRetries();
  }

  private FileStatus[] listStatusWithRetries(final Path path,
      final PathFilter filter) throws Exception {
    return new FSAction<FileStatus[]>() {
      @Override
      public FileStatus[] run() throws Exception {
        return fs.listStatus(path, filter);
      }
    }.runWithRetries();
  }

  private void closeWithRetries() throws Exception {
    new FSAction<Void>() {
      @Override
      public Void run() throws Exception {
        IOUtils.closeStream(fs);
        return null;
      }
    }.runWithRetries();
  }

  private abstract class FSAction<T> {
    abstract T run() throws Exception;

    T runWithRetries() throws Exception {
      int retry = 0;
      while (true) {
        try {
          return run();
        } catch (IOException e) {
          LOG.info("Exception while executing an FS operation.", e);
          if (++retry > fsNumRetries) {
            LOG.info("Maxed out FS retries. Giving up!");
            throw e;
          }
          LOG.info("Retrying operation on FS. Retry no. " + retry);
          Thread.sleep(fsRetryInterval);
        }
      }
    }
  }

  private void deleteFile(Path deletePath) throws Exception {
    if(!fs.delete(deletePath, true)) {
      throw new Exception("Failed to delete " + deletePath);
    }
  }

  private byte[] readFile(Path inputPath, long len) throws Exception {
    FSDataInputStream fsIn = null;
    try {
      fsIn = fs.open(inputPath);
      // state data will not be that "long"
      byte[] data = new byte[(int) len];
      fsIn.readFully(data);
      return data;
    } finally {
      IOUtils.cleanupWithLogger(LOG, fsIn);
    }
  }

  private FileStatus getFileStatus(Path path) throws Exception {
    try {
      return fs.getFileStatus(path);
    } catch (FileNotFoundException e) {
      return null;
    }
  }

  /*
   * In order to make this write atomic as a part of write we will first write
   * data to .tmp file and then rename it. Here we are assuming that rename is
   * atomic for underlying file system.
   */
  protected void writeFile(Path outputPath, byte[] data, boolean
          makeUnreadableByAdmin) throws Exception {
    Path tempPath =
        new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
    FSDataOutputStream fsOut = null;
    // This file will be overwritten when app/attempt finishes for saving the
    // final status.
    try {
      fsOut = fs.create(tempPath, true);
      if (makeUnreadableByAdmin) {
        setUnreadableBySuperuserXattrib(tempPath);
      }
      fsOut.write(data);
      fsOut.close();
      fsOut = null;
      fs.rename(tempPath, outputPath);
    } finally {
      IOUtils.cleanupWithLogger(LOG, fsOut);
    }
  }

  /*
   * In order to make this update atomic as a part of write we will first write
   * data to .new file and then rename it. Here we are assuming that rename is
   * atomic for underlying file system.
   */
  protected void updateFile(Path outputPath, byte[] data, boolean
          makeUnreadableByAdmin) throws Exception {
    Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
    // use writeFileWithRetries to make sure .new file is created atomically
    writeFileWithRetries(newPath, data, makeUnreadableByAdmin);
    replaceFile(newPath, outputPath);
  }

  protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
    if (existsWithRetries(dstPath)) {
      deleteFileWithRetries(dstPath);
    } else {
      LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
    }
    renameFileWithRetries(srcPath, dstPath);
  }

  @Private
  @VisibleForTesting
  boolean renameFile(Path src, Path dst) throws Exception {
    return fs.rename(src, dst);
  }

  private boolean createFile(Path newFile) throws Exception {
    return fs.createNewFile(newFile);
  }

  @Private
  @VisibleForTesting
  Path getNodePath(Path root, String nodeName) {
    return new Path(root, nodeName);
  }

  @Override
  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
      AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate)
      throws Exception {
    Path nodeCreatePath =
        getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
    AMRMTokenSecretManagerState data =
        AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
    byte[] stateData = data.getProto().toByteArray();
    if (isUpdate) {
      updateFile(nodeCreatePath, stateData, true);
    } else {
      writeFileWithRetries(nodeCreatePath, stateData, true);
    }
  }

  @Override
  protected void storeReservationState(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName) throws Exception {
    Path planCreatePath = getNodePath(reservationRoot, planName);
    mkdirsWithRetries(planCreatePath);
    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
    LOG.info("Storing state for reservation " + reservationIdName + " from " +
        "plan " + planName + " at path " + reservationPath);
    byte[] reservationData = reservationAllocation.toByteArray();
    writeFileWithRetries(reservationPath, reservationData, true);
  }

  @Override
  protected void removeReservationState(
      String planName, String reservationIdName) throws Exception {
    Path planCreatePath = getNodePath(reservationRoot, planName);
    Path reservationPath = getNodePath(planCreatePath, reservationIdName);
    LOG.info("Removing state for reservation " + reservationIdName + " from " +
        "plan " + planName + " at path " + reservationPath);
    deleteFileWithRetries(reservationPath);
  }

  @VisibleForTesting
  public int getNumRetries() {
    return fsNumRetries;
  }

  @VisibleForTesting
  public long getRetryInterval() {
    return fsRetryInterval;
  }

  private void setUnreadableBySuperuserXattrib(Path p) throws IOException {
    if (fs.getScheme().toLowerCase().contains("hdfs")
        && intermediateEncryptionEnabled
        && !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
      fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
        EnumSet.of(XAttrSetFlag.CREATE));
    }
  }

  private static class ReservationStateFileProcessor implements
      RMStateFileProcessor {
    private RMState rmState;
    public ReservationStateFileProcessor(RMState state) {
      this.rmState = state;
    }

    @Override
    public void processChildNode(String planName, String childNodeName,
        byte[] childData) throws IOException {
      ReservationAllocationStateProto allocationState =
          ReservationAllocationStateProto.parseFrom(childData);
      if (!rmState.getReservationState().containsKey(planName)) {
        rmState.getReservationState().put(planName,
            new HashMap<ReservationId, ReservationAllocationStateProto>());
      }
      ReservationId reservationId =
          ReservationId.parseReservationId(childNodeName);
      rmState.getReservationState().get(planName).put(reservationId,
          allocationState);
    }
  }

  private static class RMAppStateFileProcessor implements RMStateFileProcessor {
    private RMState rmState;
    private List<ApplicationAttemptStateData> attempts;

    public RMAppStateFileProcessor(RMState rmState,
        List<ApplicationAttemptStateData> attempts) {
      this.rmState = rmState;
      this.attempts = attempts;
    }

    @Override
    public void processChildNode(String appDirName, String childNodeName,
        byte[] childData) throws InvalidProtocolBufferException {
      if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
        // application
        LOG.debug("Loading application from node: {}", childNodeName);
        ApplicationStateDataPBImpl appState =
            new ApplicationStateDataPBImpl(
                ApplicationStateDataProto.parseFrom(childData));
        ApplicationId appId =
            appState.getApplicationSubmissionContext().getApplicationId();
        rmState.appState.put(appId, appState);
      } else if (childNodeName.startsWith(
          ApplicationAttemptId.appAttemptIdStrPrefix)) {
        // attempt
        LOG.debug("Loading application attempt from node: {}", childNodeName);
        ApplicationAttemptStateDataPBImpl attemptState =
            new ApplicationAttemptStateDataPBImpl(
                ApplicationAttemptStateDataProto.parseFrom(childData));
        attempts.add(attemptState);
      } else {
        LOG.info("Unknown child node with name: " + childNodeName);
      }
    }
  }

  // Interface for common state processing of directory of file layout
  private interface RMStateFileProcessor {
    void processChildNode(String appDirName, String childNodeName,
        byte[] childData)
        throws IOException;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop LeveldbRMStateStore 源码

hadoop MemoryRMStateStore 源码

hadoop NullRMStateStore 源码

hadoop RMStateStore 源码

hadoop RMStateStoreAMRMTokenEvent 源码

hadoop RMStateStoreAppAttemptEvent 源码

hadoop RMStateStoreAppEvent 源码

hadoop RMStateStoreEvent 源码

hadoop RMStateStoreEventType 源码

hadoop RMStateStoreFactory 源码

0  赞