hadoop LeveldbRMStateStore 源码

  • 2022-10-20
  • 浏览 (181)

haddop LeveldbRMStateStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * Changes from 1.0 to 1.1, Addition of ReservationSystem state.
 */
public class LeveldbRMStateStore extends RMStateStore {

  public static final Logger LOG =
      LoggerFactory.getLogger(LeveldbRMStateStore.class);

  private static final String SEPARATOR = "/";
  private static final String DB_NAME = "yarn-rm-state";
  private static final String RM_DT_MASTER_KEY_KEY_PREFIX =
      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
  private static final String RM_DT_TOKEN_KEY_PREFIX =
      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX;
  private static final String RM_DT_SEQUENCE_NUMBER_KEY =
      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
  private static final String RM_APP_KEY_PREFIX =
      RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
  private static final String RM_RESERVATION_KEY_PREFIX =
      RESERVATION_SYSTEM_ROOT + SEPARATOR;

  private static final Version CURRENT_VERSION_INFO = Version
      .newInstance(1, 1);

  private DB db;
  private DBManager dbManager = new DBManager();
  private long compactionIntervalMsec;

  private String getApplicationNodeKey(ApplicationId appId) {
    return RM_APP_ROOT + SEPARATOR + appId;
  }

  private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) {
    return getApplicationAttemptNodeKey(
        getApplicationNodeKey(attemptId.getApplicationId()), attemptId);
  }

  private String getApplicationAttemptNodeKey(String appNodeKey,
      ApplicationAttemptId attemptId) {
    return appNodeKey + SEPARATOR + attemptId;
  }

  private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) {
    return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
  }

  private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) {
    return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
  }

  private String getReservationNodeKey(String planName,
      String reservationId) {
    return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR
        + reservationId;
  }

  private String getProxyCACertNodeKey() {
    return PROXY_CA_ROOT + SEPARATOR + PROXY_CA_CERT_NODE;
  }

  private String getProxyCAPrivateKeyNodeKey() {
    return PROXY_CA_ROOT + SEPARATOR + PROXY_CA_PRIVATE_KEY_NODE;
  }

  @Override
  protected void initInternal(Configuration conf) {
    compactionIntervalMsec = conf.getLong(
        YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
        YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
  }

  private Path getStorageDir() throws IOException {
    Configuration conf = getConfig();
    String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH);
    if (storePath == null) {
      throw new IOException("No store location directory configured in " +
          YarnConfiguration.RM_LEVELDB_STORE_PATH);
    }
    return new Path(storePath, DB_NAME);
  }

  private Path createStorageDir() throws IOException {
    Path root = getStorageDir();
    FileSystem fs = FileSystem.getLocal(getConfig());
    fs.mkdirs(root, new FsPermission((short)0700));
    return root;
  }

  @Override
  protected void startInternal() throws Exception {
    Path storeRoot = createStorageDir();
    Options options = new Options();
    options.createIfMissing(false);
    LOG.info("Using state database at " + storeRoot + " for recovery");
    File dbfile = new File(storeRoot.toString());
    db = dbManager.initDatabase(dbfile, options, (database) ->
        storeVersion(CURRENT_VERSION_INFO));
    dbManager.startCompactionTimer(compactionIntervalMsec,
        this.getClass().getSimpleName());
  }

  @Override
  protected void closeInternal() throws Exception {
    dbManager.close();
  }

  @VisibleForTesting
  boolean isClosed() {
    return db == null;
  }

  @VisibleForTesting
  DB getDatabase() {
    return db;
  }

  @Override
  protected Version loadVersion() throws Exception {
    return dbManager.loadVersion(VERSION_NODE);
  }

  @Override
  protected void storeVersion() throws Exception {
    try {
      storeVersion(CURRENT_VERSION_INFO);
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  protected void storeVersion(Version version) {
    dbManager.storeVersion(VERSION_NODE, version);
  }

  @Override
  protected Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  @Override
  public synchronized long getAndIncrementEpoch() throws Exception {
    long currentEpoch = baseEpoch;
    byte[] dbKeyBytes = bytes(EPOCH_NODE);
    try {
      byte[] data = db.get(dbKeyBytes);
      if (data != null) {
        currentEpoch = EpochProto.parseFrom(data).getEpoch();
      }
      EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto();
      db.put(dbKeyBytes, proto.toByteArray());
    } catch (DBException e) {
      throw new IOException(e);
    }
    return currentEpoch;
  }

  @Override
  public RMState loadState() throws Exception {
    RMState rmState = new RMState();
     loadRMDTSecretManagerState(rmState);
     loadRMApps(rmState);
     loadAMRMTokenSecretManagerState(rmState);
    loadReservationState(rmState);
    loadProxyCAManagerState(rmState);
    return rmState;
   }

  private void loadReservationState(RMState rmState) throws IOException {
    int numReservations = 0;
    try (LeveldbIterator iter = new LeveldbIterator(db)) {
      iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
      while (iter.hasNext()) {
        Entry<byte[],byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(RM_RESERVATION_KEY_PREFIX)) {
          break;
        }

        String planReservationString =
            key.substring(RM_RESERVATION_KEY_PREFIX.length());
        String[] parts = planReservationString.split(SEPARATOR);
        if (parts.length != 2) {
          LOG.warn("Incorrect reservation state key " + key);
          continue;
        }
        String planName = parts[0];
        String reservationName = parts[1];
        ReservationAllocationStateProto allocationState =
            ReservationAllocationStateProto.parseFrom(entry.getValue());
        if (!rmState.getReservationState().containsKey(planName)) {
          rmState.getReservationState().put(planName,
              new HashMap<ReservationId, ReservationAllocationStateProto>());
        }
        ReservationId reservationId =
            ReservationId.parseReservationId(reservationName);
        rmState.getReservationState().get(planName).put(reservationId,
            allocationState);
        numReservations++;
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
    LOG.info("Recovered " + numReservations + " reservations");
  }

  private void loadRMDTSecretManagerState(RMState state) throws IOException {
    int numKeys = loadRMDTSecretManagerKeys(state);
    LOG.info("Recovered " + numKeys + " RM delegation token master keys");
    int numTokens = loadRMDTSecretManagerTokens(state);
    LOG.info("Recovered " + numTokens + " RM delegation tokens");
    loadRMDTSecretManagerTokenSequenceNumber(state);
  }

  private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
    int numKeys = 0;
    try (LeveldbIterator iter = new LeveldbIterator(db)) {
      iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
      while (iter.hasNext()) {
        Entry<byte[],byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) {
          break;
        }
        DelegationKey masterKey = loadDelegationKey(entry.getValue());
        state.rmSecretManagerState.masterKeyState.add(masterKey);
        ++numKeys;
        LOG.debug("Loaded RM delegation key from {}: keyId={},"
            + " expirationDate={}", key, masterKey.getKeyId(),
            masterKey.getExpiryDate());
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
    return numKeys;
  }

  private DelegationKey loadDelegationKey(byte[] data) throws IOException {
    DelegationKey key = new DelegationKey();
    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
    try {
      key.readFields(in);
    } finally {
      IOUtils.cleanupWithLogger(LOG, in);
    }
    return key;
  }

  private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
    int numTokens = 0;
    try (LeveldbIterator iter = new LeveldbIterator(db)) {
      iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
      while (iter.hasNext()) {
        Entry<byte[],byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) {
          break;
        }
        RMDelegationTokenIdentifierData tokenData = loadDelegationToken(
            entry.getValue());
        RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier();
        long renewDate = tokenData.getRenewDate();
        state.rmSecretManagerState.delegationTokenState.put(tokenId,
            renewDate);
        ++numTokens;
        LOG.debug("Loaded RM delegation token from {}: tokenId={},"
            + " renewDate={}", key, tokenId, renewDate);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
    return numTokens;
  }

  private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
      throws IOException {
    RMDelegationTokenIdentifierData tokenData;
    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
    try {
      tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
    } finally {
      IOUtils.cleanupWithLogger(LOG, in);
    }
    return tokenData;
  }

  private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
      throws IOException {
    byte[] data;
    try {
      data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
    } catch (DBException e) {
      throw new IOException(e);
    }
    if (data != null) {
      DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
      try {
        state.rmSecretManagerState.dtSequenceNumber = in.readInt();
      } finally {
        IOUtils.cleanupWithLogger(LOG, in);
      }
    }
  }

  private void loadRMApps(RMState state) throws IOException {
    int numApps = 0;
    int numAppAttempts = 0;
    try (LeveldbIterator iter = new LeveldbIterator(db)) {
      iter.seek(bytes(RM_APP_KEY_PREFIX));
      while (iter.hasNext()) {
        Entry<byte[],byte[]> entry = iter.next();
        String key = asString(entry.getKey());
        if (!key.startsWith(RM_APP_KEY_PREFIX)) {
          break;
        }

        String appIdStr = key.substring(RM_APP_ROOT.length() + 1);
        if (appIdStr.contains(SEPARATOR)) {
          LOG.warn("Skipping extraneous data " + key);
          continue;
        }

        numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue());
        ++numApps;
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
    LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
        + " application attempts");
  }

  private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr,
      byte[] appData) throws IOException {
    ApplicationStateData appState = createApplicationState(appIdStr, appData);
    ApplicationId appId =
        appState.getApplicationSubmissionContext().getApplicationId();
    rmState.appState.put(appId, appState);
    String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR;
    while (iter.hasNext()) {
      Entry<byte[],byte[]> entry = iter.peekNext();
      String key = asString(entry.getKey());
      if (!key.startsWith(attemptNodePrefix)) {
        break;
      }

      String attemptId = key.substring(attemptNodePrefix.length());
      if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
        ApplicationAttemptStateData attemptState =
            createAttemptState(attemptId, entry.getValue());
        appState.attempts.put(attemptState.getAttemptId(), attemptState);
      } else {
        LOG.warn("Ignoring unknown application key: " + key);
      }
      iter.next();
    }
    int numAttempts = appState.attempts.size();
    LOG.debug("Loaded application {} with {} attempts", appId, numAttempts);
    return numAttempts;
  }

  private ApplicationStateData createApplicationState(String appIdStr,
      byte[] data) throws IOException {
    ApplicationId appId = ApplicationId.fromString(appIdStr);
    ApplicationStateDataPBImpl appState =
        new ApplicationStateDataPBImpl(
            ApplicationStateDataProto.parseFrom(data));
    if (!appId.equals(
        appState.getApplicationSubmissionContext().getApplicationId())) {
      throw new YarnRuntimeException("The database entry for " + appId
          + " contains data for "
          + appState.getApplicationSubmissionContext().getApplicationId());
    }
    return appState;
  }

  @VisibleForTesting
  ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
    String appKey = getApplicationNodeKey(appId);
    byte[] data;
    try {
      data = db.get(bytes(appKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
    if (data == null) {
      return null;
    }
    return createApplicationState(appId.toString(), data);
  }

  @VisibleForTesting
  ApplicationAttemptStateData loadRMAppAttemptState(
      ApplicationAttemptId attemptId) throws IOException {
    String attemptKey = getApplicationAttemptNodeKey(attemptId);
    byte[] data;
    try {
      data = db.get(bytes(attemptKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
    if (data == null) {
      return null;
    }
    return createAttemptState(attemptId.toString(), data);
  }

  private ApplicationAttemptStateData createAttemptState(String itemName,
      byte[] data) throws IOException {
    ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(itemName);
    ApplicationAttemptStateDataPBImpl attemptState =
        new ApplicationAttemptStateDataPBImpl(
            ApplicationAttemptStateDataProto.parseFrom(data));
    if (!attemptId.equals(attemptState.getAttemptId())) {
      throw new YarnRuntimeException("The database entry for " + attemptId
          + " contains data for " + attemptState.getAttemptId());
    }
    return attemptState;
  }

  private void loadAMRMTokenSecretManagerState(RMState rmState)
      throws IOException {
    try {
      byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT));
      if (data != null) {
        AMRMTokenSecretManagerStatePBImpl stateData =
            new AMRMTokenSecretManagerStatePBImpl(
                AMRMTokenSecretManagerStateProto.parseFrom(data));
        rmState.amrmTokenSecretManagerState =
            AMRMTokenSecretManagerState.newInstance(
                stateData.getCurrentMasterKey(),
                stateData.getNextMasterKey());
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  private void loadProxyCAManagerState(RMState rmState) throws Exception {
    byte[] caCertData;
    byte[] caPrivateKeyData;

    String caCertKey = getProxyCACertNodeKey();
    String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey();

    try {
      caCertData = db.get(bytes(caCertKey));
    } catch (DBException e) {
      throw new IOException(e);
    }

    try {
      caPrivateKeyData = db.get(bytes(caPrivateKeyKey));
    } catch (DBException e) {
      throw new IOException(e);
    }

    if (caCertData == null || caPrivateKeyData == null) {
      LOG.warn("Couldn't find Proxy CA data");
      return;
    }

    rmState.proxyCAState.setCaCert(caCertData);
    rmState.proxyCAState.setCaPrivateKey(caPrivateKeyData);
  }

  @Override
  protected void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateData) throws IOException {
    String key = getApplicationNodeKey(appId);
    LOG.debug("Storing state for app {} at {}", appId, key);
    try {
      db.put(bytes(key), appStateData.getProto().toByteArray());
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void updateApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateData) throws IOException {
    storeApplicationStateInternal(appId, appStateData);
  }

  @Override
  protected void storeApplicationAttemptStateInternal(
      ApplicationAttemptId attemptId,
      ApplicationAttemptStateData attemptStateData) throws IOException {
    String key = getApplicationAttemptNodeKey(attemptId);
    LOG.debug("Storing state for attempt {} at {}", attemptId, key);
    try {
      db.put(bytes(key), attemptStateData.getProto().toByteArray());
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void updateApplicationAttemptStateInternal(
      ApplicationAttemptId attemptId,
      ApplicationAttemptStateData attemptStateData) throws IOException {
    storeApplicationAttemptStateInternal(attemptId, attemptStateData);
  }

  @Override
  public synchronized void removeApplicationAttemptInternal(
      ApplicationAttemptId attemptId)
      throws IOException {
    String attemptKey = getApplicationAttemptNodeKey(attemptId);
    LOG.debug("Removing state for attempt {} at {}", attemptId, attemptKey);
    try {
      db.delete(bytes(attemptKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void removeApplicationStateInternal(ApplicationStateData appState)
      throws IOException {
    ApplicationId appId =
        appState.getApplicationSubmissionContext().getApplicationId();
    String appKey = getApplicationNodeKey(appId);
    try {
      try (WriteBatch batch = db.createWriteBatch()) {
        batch.delete(bytes(appKey));
        for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
          String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
          batch.delete(bytes(attemptKey));
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("Removing state for app " + appId + " and "
              + appState.attempts.size() + " attempts" + " at " + appKey);
        }
        db.write(batch);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void storeReservationState(
      ReservationAllocationStateProto reservationAllocation, String planName,
      String reservationIdName) throws Exception {
    try {
      try (WriteBatch batch = db.createWriteBatch()) {
        String key = getReservationNodeKey(planName, reservationIdName);
        LOG.debug("Storing state for reservation {} plan {} at {}",
            reservationIdName, planName, key);

        batch.put(bytes(key), reservationAllocation.toByteArray());
        db.write(batch);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void removeReservationState(String planName,
      String reservationIdName) throws Exception {
    try {
      try (WriteBatch batch = db.createWriteBatch()) {
        String reservationKey =
            getReservationNodeKey(planName, reservationIdName);
        batch.delete(bytes(reservationKey));
        LOG.debug("Removing state for reservation {} plan {} at {}",
            reservationIdName, planName, reservationKey);
        db.write(batch);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
      Long renewDate, boolean isUpdate) throws IOException {
    String tokenKey = getRMDTTokenNodeKey(tokenId);
    RMDelegationTokenIdentifierData tokenData =
        new RMDelegationTokenIdentifierData(tokenId, renewDate);
    LOG.debug("Storing token to {}", tokenKey);
    try {
      try (WriteBatch batch = db.createWriteBatch()) {
        batch.put(bytes(tokenKey), tokenData.toByteArray());
        if (!isUpdate) {
          ByteArrayOutputStream bs = new ByteArrayOutputStream();
          try (DataOutputStream ds = new DataOutputStream(bs)) {
            ds.writeInt(tokenId.getSequenceNumber());
          }
          LOG.debug("Storing {} to {}", tokenId.getSequenceNumber(),
              RM_DT_SEQUENCE_NUMBER_KEY);
          batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
        }
        db.write(batch);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void storeRMDelegationTokenState(
      RMDelegationTokenIdentifier tokenId, Long renewDate)
      throws IOException {
    storeOrUpdateRMDT(tokenId, renewDate, false);
  }

  @Override
  protected void updateRMDelegationTokenState(
      RMDelegationTokenIdentifier tokenId, Long renewDate)
      throws IOException {
    storeOrUpdateRMDT(tokenId, renewDate, true);
  }

  @Override
  protected void removeRMDelegationTokenState(
      RMDelegationTokenIdentifier tokenId) throws IOException {
    String tokenKey = getRMDTTokenNodeKey(tokenId);
    LOG.debug("Removing token at {}", tokenKey);
    try {
      db.delete(bytes(tokenKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void storeRMDTMasterKeyState(DelegationKey masterKey)
      throws IOException {
    String dbKey = getRMDTMasterKeyNodeKey(masterKey);
    LOG.debug("Storing token master key to {}", dbKey);
    ByteArrayOutputStream os = new ByteArrayOutputStream();
    try (DataOutputStream out = new DataOutputStream(os)) {
      masterKey.write(out);
    }
    try {
      db.put(bytes(dbKey), os.toByteArray());
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  protected void removeRMDTMasterKeyState(DelegationKey masterKey)
      throws IOException {
    String dbKey = getRMDTMasterKeyNodeKey(masterKey);
    LOG.debug("Removing token master key at {}", dbKey);
    try {
      db.delete(bytes(dbKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  public void storeOrUpdateAMRMTokenSecretManagerState(
      AMRMTokenSecretManagerState state, boolean isUpdate) {
    AMRMTokenSecretManagerState data =
        AMRMTokenSecretManagerState.newInstance(state);
    byte[] stateData = data.getProto().toByteArray();
    db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
  }

  @Override
  protected void storeProxyCACertState(
      X509Certificate caCert, PrivateKey caPrivateKey) throws Exception {
    byte[] caCertData = caCert.getEncoded();
    byte[] caPrivateKeyData = caPrivateKey.getEncoded();

    String caCertKey = getProxyCACertNodeKey();
    String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey();

    try {
      try (WriteBatch batch = db.createWriteBatch()) {
        batch.put(bytes(caCertKey), caCertData);
        batch.put(bytes(caPrivateKeyKey), caPrivateKeyData);
        db.write(batch);
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @Override
  public void deleteStore() throws IOException {
    Path root = getStorageDir();
    LOG.info("Deleting state database at " + root);
    db.close();
    db = null;
    FileSystem fs = FileSystem.getLocal(getConfig());
    fs.delete(root, true);
  }

  @Override
  public synchronized void removeApplication(ApplicationId removeAppId)
      throws IOException {
    String appKey = getApplicationNodeKey(removeAppId);
    LOG.info("Removing state for app " + removeAppId);
    try {
      db.delete(bytes(appKey));
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @VisibleForTesting
  int getNumEntriesInDatabase() throws IOException {
    int numEntries = 0;
    try (LeveldbIterator iter = new LeveldbIterator(db)) {
      iter.seekToFirst();
      while (iter.hasNext()) {
        Entry<byte[], byte[]> entry = iter.next();
        LOG.info("entry: " + asString(entry.getKey()));
        ++numEntries;
      }
    } catch (DBException e) {
      throw new IOException(e);
    }
    return numEntries;
  }

  @VisibleForTesting
  protected void setDbManager(DBManager dbManager) {
    this.dbManager = dbManager;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop FileSystemRMStateStore 源码

hadoop MemoryRMStateStore 源码

hadoop NullRMStateStore 源码

hadoop RMStateStore 源码

hadoop RMStateStoreAMRMTokenEvent 源码

hadoop RMStateStoreAppAttemptEvent 源码

hadoop RMStateStoreAppEvent 源码

hadoop RMStateStoreEvent 源码

hadoop RMStateStoreEventType 源码

hadoop RMStateStoreFactory 源码

0  赞