hadoop StoragePolicySatisfier 源码

  • 2022-10-20
  • 浏览 (114)

haddop StoragePolicySatisfier 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.sps;

import static org.apache.hadoop.util.Time.monotonicNow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

/**
 * Setting storagePolicy on a file after the file write will only update the new
 * storage policy type in Namespace, but physical block storage movement will
 * not happen until user runs "Mover Tool" explicitly for such files. The
 * StoragePolicySatisfier Daemon thread implemented for addressing the case
 * where users may want to physically move the blocks by a dedicated daemon (can
 * run inside Namenode or stand alone) instead of running mover tool explicitly.
 * Just calling client API to satisfyStoragePolicy on a file/dir will
 * automatically trigger to move its physical storage locations as expected in
 * asynchronous manner. Here SPS will pick the file blocks which are expecting
 * to change its storages, then it will build the mapping of source block
 * location and expected storage type and location to move. After that this
 * class will also prepare requests to send to Datanode for processing the
 * physical block movements.
 */
@InterfaceAudience.Private
public class StoragePolicySatisfier implements SPSService, Runnable {
  public static final Logger LOG =
      LoggerFactory.getLogger(StoragePolicySatisfier.class);
  private Daemon storagePolicySatisfierThread;
  private BlockStorageMovementNeeded storageMovementNeeded;
  private BlockStorageMovementAttemptedItems storageMovementsMonitor;
  private volatile boolean isRunning = false;
  private int spsWorkMultiplier;
  private long blockCount = 0L;
  private int blockMovementMaxRetry;
  private Context ctxt;
  private final Configuration conf;
  private DatanodeCacheManager dnCacheMgr;

  public StoragePolicySatisfier(Configuration conf) {
    this.conf = conf;
  }

  /**
   * Represents the collective analysis status for all blocks.
   */
  private static class BlocksMovingAnalysis {

    enum Status {
      // Represents that, the analysis skipped due to some conditions. A such
      // condition is if block collection is in incomplete state.
      ANALYSIS_SKIPPED_FOR_RETRY,
      // Represents that few or all blocks found respective target to do
      // the storage movement.
      BLOCKS_TARGETS_PAIRED,
      // Represents that none of the blocks found respective target to do
      // the storage movement.
      NO_BLOCKS_TARGETS_PAIRED,
      // Represents that, none of the blocks found for block storage movements.
      BLOCKS_ALREADY_SATISFIED,
      // Represents that, the analysis skipped due to some conditions.
      // Example conditions are if no blocks really exists in block collection
      // or
      // if analysis is not required on ec files with unsuitable storage
      // policies
      BLOCKS_TARGET_PAIRING_SKIPPED,
      // Represents that, All the reported blocks are satisfied the policy but
      // some of the blocks are low redundant.
      FEW_LOW_REDUNDANCY_BLOCKS,
      // Represents that, movement failures due to unexpected errors.
      BLOCKS_FAILED_TO_MOVE
    }

    private Status status = null;
    private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null;

    BlocksMovingAnalysis(Status status,
        Map<Block, Set<StorageTypeNodePair>> assignedBlocks) {
      this.status = status;
      this.assignedBlocks = assignedBlocks;
    }
  }

  public void init(final Context context) {
    this.ctxt = context;
    this.storageMovementNeeded = new BlockStorageMovementNeeded(context);
    this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
        this, storageMovementNeeded, context);
    this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
    this.blockMovementMaxRetry = getConf().getInt(
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
        DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
  }

  /**
   * Start storage policy satisfier demon thread. Also start block storage
   * movements monitor for retry the attempts if needed.
   */
  @Override
  public synchronized void start(StoragePolicySatisfierMode serviceMode) {
    if (serviceMode == StoragePolicySatisfierMode.NONE) {
      LOG.error("Can't start StoragePolicySatisfier for the given mode:{}",
          serviceMode);
      return;
    }
    LOG.info("Starting {} StoragePolicySatisfier.",
        StringUtils.toLowerCase(serviceMode.toString()));
    isRunning = true;
    storagePolicySatisfierThread = new Daemon(this);
    storagePolicySatisfierThread.setName("StoragePolicySatisfier");
    storagePolicySatisfierThread.start();
    this.storageMovementsMonitor.start();
    this.storageMovementNeeded.activate();
    dnCacheMgr = new DatanodeCacheManager(conf);
  }

  @Override
  public synchronized void stop(boolean forceStop) {
    isRunning = false;
    if (storagePolicySatisfierThread == null) {
      return;
    }

    storageMovementNeeded.close();

    storagePolicySatisfierThread.interrupt();
    this.storageMovementsMonitor.stop();
    if (forceStop) {
      storageMovementNeeded.clearQueuesWithNotification();
    } else {
      LOG.info("Stopping StoragePolicySatisfier.");
    }
  }

  @Override
  public synchronized void stopGracefully() {
    if (isRunning) {
      stop(false);
    }

    if (this.storageMovementsMonitor != null) {
      this.storageMovementsMonitor.stopGracefully();
    }

    if (storagePolicySatisfierThread != null) {
      try {
        storagePolicySatisfierThread.join(3000);
      } catch (InterruptedException ie) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Interrupted Exception while waiting to join sps thread,"
              + " ignoring it", ie);
        }
      }
    }
  }

  @Override
  public boolean isRunning() {
    return isRunning;
  }

  @Override
  public void run() {
    while (isRunning) {
      // Check if dependent service is running
      if (!ctxt.isRunning()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Upstream service is down, skipping the sps work.");
        }
        continue;
      }
      ItemInfo itemInfo = null;
      try {
        boolean retryItem = false;
        if (!ctxt.isInSafeMode()) {
          itemInfo = storageMovementNeeded.get();
          if (itemInfo != null) {
            if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
              LOG.info("Failed to satisfy the policy after "
                  + blockMovementMaxRetry + " retries. Removing inode "
                  + itemInfo.getFile() + " from the queue");
              storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
              continue;
            }
            long trackId = itemInfo.getFile();
            BlocksMovingAnalysis status = null;
            BlockStoragePolicy existingStoragePolicy;
            // TODO: presently, context internally acquire the lock
            // and returns the result. Need to discuss to move the lock outside?
            HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
            // Check path existence.
            if (fileStatus == null || fileStatus.isDir()) {
              // File doesn't exists (maybe got deleted) or its a directory,
              // just remove trackId from the queue
              storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
            } else {
              byte existingStoragePolicyID = fileStatus.getStoragePolicy();
              existingStoragePolicy = ctxt
                  .getStoragePolicy(existingStoragePolicyID);

              HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
              status = analyseBlocksStorageMovementsAndAssignToDN(file,
                  existingStoragePolicy);
              switch (status.status) {
              // Just add to monitor, so it will be retried after timeout
              case ANALYSIS_SKIPPED_FOR_RETRY:
                // Just add to monitor, so it will be tracked for report and
                // be removed on storage movement attempt finished report.
              case BLOCKS_TARGETS_PAIRED:
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Block analysis status:{} for the file id:{}."
                      + " Adding to attempt monitor queue for the storage "
                      + "movement attempt finished report",
                      status.status, fileStatus.getFileId());
                }
                this.storageMovementsMonitor.add(itemInfo.getStartPath(),
                    itemInfo.getFile(), monotonicNow(), status.assignedBlocks,
                    itemInfo.getRetryCount());
                break;
              case NO_BLOCKS_TARGETS_PAIRED:
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Adding trackID:{} for the file id:{} back to"
                      + " retry queue as none of the blocks found its eligible"
                      + " targets.", trackId, fileStatus.getFileId());
                }
                retryItem = true;
                break;
              case FEW_LOW_REDUNDANCY_BLOCKS:
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                      + "retry queue as some of the blocks are low redundant.",
                      trackId, fileStatus.getFileId());
                }
                retryItem = true;
                break;
              case BLOCKS_FAILED_TO_MOVE:
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Adding trackID:{} for the file id:{} back to "
                      + "retry queue as some of the blocks movement failed.",
                      trackId, fileStatus.getFileId());
                }
                retryItem = true;
                break;
              // Just clean Xattrs
              case BLOCKS_TARGET_PAIRING_SKIPPED:
              case BLOCKS_ALREADY_SATISFIED:
              default:
                LOG.info("Block analysis status:{} for the file id:{}."
                    + " So, Cleaning up the Xattrs.", status.status,
                    fileStatus.getFileId());
                storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
                break;
              }
            }
          }
        } else {
          LOG.info("Namenode is in safemode. It will retry again.");
          Thread.sleep(3000);
        }
        int numLiveDn = ctxt.getNumLiveDataNodes();
        if (storageMovementNeeded.size() == 0
            || blockCount > (numLiveDn * spsWorkMultiplier)) {
          Thread.sleep(3000);
          blockCount = 0L;
        }
        if (retryItem) {
          this.storageMovementNeeded.add(itemInfo);
        }
      } catch (IOException e) {
        LOG.error("Exception during StoragePolicySatisfier execution - "
            + "will continue next cycle", e);
        // Since it could not finish this item in previous iteration due to IOE,
        // just try again.
        this.storageMovementNeeded.add(itemInfo);
      } catch (Throwable t) {
        synchronized (this) {
          if (isRunning) {
            isRunning = false;
            if (t instanceof InterruptedException) {
              LOG.info("Stopping StoragePolicySatisfier.", t);
            } else {
              LOG.error("StoragePolicySatisfier thread received "
                  + "runtime exception.", t);
            }
            // Stopping monitor thread and clearing queues as well
            this.clearQueues();
            this.storageMovementsMonitor.stopGracefully();
          }
        }
      }
    }
  }

  private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
      HdfsLocatedFileStatus fileInfo,
      BlockStoragePolicy existingStoragePolicy) throws IOException {
    BlocksMovingAnalysis.Status status =
        BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
    final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
    final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks();
    final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
    if (!lastBlkComplete) {
      // Postpone, currently file is under construction
      LOG.info("File: {} is under construction. So, postpone"
          + " this to the next retry iteration", fileInfo.getFileId());
      return new BlocksMovingAnalysis(
          BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
          new HashMap<>());
    }

    List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
    if (blocks.size() == 0) {
      LOG.info("File: {} is not having any blocks."
          + " So, skipping the analysis.", fileInfo.getFileId());
      return new BlocksMovingAnalysis(
          BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
          new HashMap<>());
    }
    List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
    boolean hasLowRedundancyBlocks = false;
    int replication = fileInfo.getReplication();
    DatanodeMap liveDns = dnCacheMgr.getLiveDatanodeStorageReport(ctxt);
    for (int i = 0; i < blocks.size(); i++) {
      LocatedBlock blockInfo = blocks.get(i);

      // Block is considered as low redundancy when the block locations array
      // length is less than expected replication factor. If any of the block is
      // low redundant, then hasLowRedundancyBlocks will be marked as true.
      hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication,
          ecPolicy);

      List<StorageType> expectedStorageTypes;
      if (blockInfo.isStriped()) {
        if (ErasureCodingPolicyManager
            .checkStoragePolicySuitableForECStripedMode(
                existingStoragePolicy.getId())) {
          expectedStorageTypes = existingStoragePolicy
              .chooseStorageTypes((short) blockInfo.getLocations().length);
        } else {
          // Currently we support only limited policies (HOT, COLD, ALLSSD)
          // for EC striped mode files. SPS will ignore to move the blocks if
          // the storage policy is not in EC Striped mode supported policies
          LOG.warn("The storage policy " + existingStoragePolicy.getName()
              + " is not suitable for Striped EC files. "
              + "So, ignoring to move the blocks");
          return new BlocksMovingAnalysis(
              BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
              new HashMap<>());
        }
      } else {
        expectedStorageTypes = existingStoragePolicy
            .chooseStorageTypes(fileInfo.getReplication());
      }

      List<StorageType> existing = new LinkedList<StorageType>(
          Arrays.asList(blockInfo.getStorageTypes()));
      if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
          existing, true)) {
        boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
            blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
            liveDns, ecPolicy);
        if (blocksPaired) {
          status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
        } else if (status !=
            BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
          // Check if the previous block was successfully paired. Here the
          // status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
          // blocks of a file found its eligible targets to satisfy the storage
          // policy.
          status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
        }
      }
    }

    // If there is no block paired and few blocks are low redundant, so marking
    // the status as FEW_LOW_REDUNDANCY_BLOCKS.
    if (hasLowRedundancyBlocks
        && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
      status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
    }
    Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>();
    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
      // Check for at least one block storage movement has been chosen
      try {
        ctxt.submitMoveTask(blkMovingInfo);
        LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
        StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
            blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
        Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks
            .get(blkMovingInfo.getBlock());
        if (nodesWithStorage == null) {
          nodesWithStorage = new HashSet<>();
          assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage);
        }
        nodesWithStorage.add(nodeStorage);
        blockCount++;
      } catch (IOException e) {
        LOG.warn("Exception while scheduling movement task", e);
        // failed to move the block.
        status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
      }
    }
    return new BlocksMovingAnalysis(status, assignedBlocks);
  }

  /**
   * The given block is considered as low redundancy when the block locations
   * length is less than expected replication factor. For EC blocks, redundancy
   * is the summation of data + parity blocks.
   *
   * @param blockInfo
   *          block
   * @param replication
   *          replication factor of the given file block
   * @param ecPolicy
   *          erasure coding policy of the given file block
   * @return true if the given block is low redundant.
   */
  private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
      ErasureCodingPolicy ecPolicy) {
    boolean hasLowRedundancyBlock = false;
    if (blockInfo.isStriped()) {
      // For EC blocks, redundancy is the summation of data + parity blocks.
      replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
    }
    // block is considered as low redundancy when the block locations length is
    // less than expected replication factor.
    hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true
        : false;
    return hasLowRedundancyBlock;
  }

  /**
   * Compute the list of block moving information corresponding to the given
   * blockId. This will check that each block location of the given block is
   * satisfying the expected storage policy. If block location is not satisfied
   * the policy then find out the target node with the expected storage type to
   * satisfy the storage policy.
   *
   * @param blockMovingInfos
   *          - list of block source and target node pair
   * @param blockInfo
   *          - block details
   * @param expectedStorageTypes
   *          - list of expected storage type to satisfy the storage policy
   * @param existing
   *          - list to get existing storage types
   * @param storages
   *          - available storages
   * @param liveDns
   *          - live datanodes which can be used as targets
   * @param ecPolicy
   *          - ec policy of sps invoked file
   * @return false if some of the block locations failed to find target node to
   *         satisfy the storage policy, true otherwise
   */
  private boolean computeBlockMovingInfos(
      List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
      List<StorageType> expectedStorageTypes, List<StorageType> existing,
      DatanodeInfo[] storages, DatanodeMap liveDns,
      ErasureCodingPolicy ecPolicy) {
    boolean foundMatchingTargetNodesForBlock = true;
    if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
        existing, true)) {
      List<StorageTypeNodePair> sourceWithStorageMap =
          new ArrayList<StorageTypeNodePair>();
      List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
          Arrays.asList(storages));

      // Add existing storages into exclude nodes to avoid choosing this as
      // remote target later.
      List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages);

      // if expected type exists in source node already, local movement would be
      // possible, so lets find such sources first.
      Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
      while (iterator.hasNext()) {
        DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator
            .next();
        if (checkSourceAndTargetTypeExists(dnInfo, existing,
            expectedStorageTypes, liveDns)) {
          sourceWithStorageMap
              .add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo));
          iterator.remove();
          existing.remove(dnInfo.getStorageType());
        }
      }

      // Let's find sources for existing types left.
      for (StorageType existingType : existing) {
        iterator = existingBlockStorages.iterator();
        while (iterator.hasNext()) {
          DatanodeInfoWithStorage dnStorageInfo =
              (DatanodeInfoWithStorage) iterator.next();
          StorageType storageType = dnStorageInfo.getStorageType();
          if (storageType == existingType) {
            iterator.remove();
            sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
                dnStorageInfo));
            break;
          }
        }
      }

      EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns =
          findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);

      foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
          blockMovingInfos, blockInfo, sourceWithStorageMap,
          expectedStorageTypes, targetDns,
          ecPolicy, excludeNodes);
    }
    return foundMatchingTargetNodesForBlock;
  }

  /**
   * Find the good target node for each source node for which block storages was
   * misplaced.
   *
   * @param blockMovingInfos
   *          - list of block source and target node pair
   * @param blockInfo
   *          - Block
   * @param sourceWithStorageList
   *          - Source Datanode with storages list
   * @param expectedTypes
   *          - Expecting storages to move
   * @param targetDns
   *          - Available DNs for expected storage types
   * @param ecPolicy
   *          - erasure coding policy of sps invoked file
   * @param excludeNodes
   *          - existing source nodes, which has replica copy
   * @return false if some of the block locations failed to find target node to
   *         satisfy the storage policy
   */
  private boolean findSourceAndTargetToMove(
      List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
      List<StorageTypeNodePair> sourceWithStorageList,
      List<StorageType> expectedTypes,
      EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
      ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) {
    boolean foundMatchingTargetNodesForBlock = true;

    // Looping over all the source node locations and choose the target
    // storage within same node if possible. This is done separately to
    // avoid choosing a target which already has this block.
    for (int i = 0; i < sourceWithStorageList.size(); i++) {
      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);

      // Check whether the block replica is already placed in the expected
      // storage type in this source datanode.
      if (!expectedTypes.contains(existingTypeNodePair.storageType)) {
        StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
            existingTypeNodePair.dn, targetDns, expectedTypes);
        if (chosenTarget != null) {
          if (blockInfo.isStriped()) {
            buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
                existingTypeNodePair.storageType, chosenTarget.dn,
                chosenTarget.storageType, blockMovingInfos,
                ecPolicy);
          } else {
            buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
                existingTypeNodePair.storageType, chosenTarget.dn,
                chosenTarget.storageType, blockMovingInfos);
          }
          expectedTypes.remove(chosenTarget.storageType);
        }
      }
    }
    // If all the sources and targets are paired within same node, then simply
    // return.
    if (expectedTypes.size() <= 0) {
      return foundMatchingTargetNodesForBlock;
    }
    // Looping over all the source node locations. Choose a remote target
    // storage node if it was not found out within same node.
    for (int i = 0; i < sourceWithStorageList.size(); i++) {
      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
      StorageTypeNodePair chosenTarget = null;
      // Chosen the target storage within same datanode. So just skipping this
      // source node.
      if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
        continue;
      }
      if (chosenTarget == null && dnCacheMgr.getCluster().isNodeGroupAware()) {
        chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
            expectedTypes, Matcher.SAME_NODE_GROUP, targetDns,
            excludeNodes);
      }

      // Then, match nodes on the same rack
      if (chosenTarget == null) {
        chosenTarget =
            chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
                Matcher.SAME_RACK, targetDns, excludeNodes);
      }

      if (chosenTarget == null) {
        chosenTarget =
            chooseTarget(blockInfo, existingTypeNodePair.dn, expectedTypes,
                Matcher.ANY_OTHER, targetDns, excludeNodes);
      }
      if (null != chosenTarget) {
        if (blockInfo.isStriped()) {
          buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
              existingTypeNodePair.storageType, chosenTarget.dn,
              chosenTarget.storageType, blockMovingInfos, ecPolicy);
        } else {
          buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
              existingTypeNodePair.storageType, chosenTarget.dn,
              chosenTarget.storageType, blockMovingInfos);
        }

        expectedTypes.remove(chosenTarget.storageType);
        excludeNodes.add(chosenTarget.dn);
      } else {
        LOG.warn(
            "Failed to choose target datanode for the required"
                + " storage types {}, block:{}, existing storage type:{}",
            expectedTypes, blockInfo, existingTypeNodePair.storageType);
      }
    }

    if (expectedTypes.size() > 0) {
      foundMatchingTargetNodesForBlock = false;
    }

    return foundMatchingTargetNodesForBlock;
  }

  private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
      DatanodeInfo dn) {
    for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
      if (blockMovingInfo.getSource().equals(dn)) {
        return true;
      }
    }
    return false;
  }

  private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo,
      DatanodeInfo sourceNode, StorageType sourceStorageType,
      DatanodeInfo targetNode, StorageType targetStorageType,
      List<BlockMovingInfo> blkMovingInfos) {
    Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock());
    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
        targetNode, sourceStorageType, targetStorageType);
    blkMovingInfos.add(blkMovingInfo);
  }

  private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
      DatanodeInfo sourceNode, StorageType sourceStorageType,
      DatanodeInfo targetNode, StorageType targetStorageType,
      List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) {
    // For a striped block, it needs to construct internal block at the given
    // index of a block group. Here it is iterating over all the block indices
    // and construct internal blocks which can be then considered for block
    // movement.
    LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo;
    byte[] indices = sBlockInfo.getBlockIndices();
    DatanodeInfo[] locations = sBlockInfo.getLocations();
    for (int i = 0; i < indices.length; i++) {
      byte blkIndex = indices[i];
      if (blkIndex >= 0) {
        // pick block movement only for the given source node.
        if (sourceNode.equals(locations[i])) {
          // construct internal block
          ExtendedBlock extBlock = sBlockInfo.getBlock();
          long numBytes = StripedBlockUtil.getInternalBlockLength(
              extBlock.getNumBytes(), ecPolicy, blkIndex);
          Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock));
          long blkId = blk.getBlockId() + blkIndex;
          blk.setBlockId(blkId);
          blk.setNumBytes(numBytes);
          BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
              targetNode, sourceStorageType, targetStorageType);
          blkMovingInfos.add(blkMovingInfo);
        }
      }
    }
  }

  /**
   * Choose the target storage within same datanode if possible.
   *
   * @param blockInfo
   *          - block info
   * @param source
   *          - source datanode
   * @param targetDns
   *          - set of target datanodes with its respective storage type
   * @param targetTypes
   *          - list of target storage types
   */
  private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
      DatanodeInfo source,
      EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns,
      List<StorageType> targetTypes) {
    for (StorageType t : targetTypes) {
      List<DatanodeWithStorage.StorageDetails> targetNodeStorages =
          targetDns.get(t);
      if (targetNodeStorages == null) {
        continue;
      }
      for (DatanodeWithStorage.StorageDetails targetNode : targetNodeStorages) {
        if (targetNode.getDatanodeInfo().equals(source)) {
          // Good target with enough space to write the given block size.
          if (targetNode.hasSpaceForScheduling(blockInfo.getBlockSize())) {
            targetNode.incScheduledSize(blockInfo.getBlockSize());
            return new StorageTypeNodePair(t, source);
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
                + "space:{} to move the target block size:{}",
                source, t, targetNode, blockInfo.getBlockSize());
          }
        }
      }
    }
    return null;
  }

  private StorageTypeNodePair chooseTarget(LocatedBlock block,
      DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
      EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
      locsForExpectedStorageTypes, List<DatanodeInfo> excludeNodes) {
    for (StorageType t : targetTypes) {
      List<DatanodeWithStorage.StorageDetails> nodesWithStorages =
          locsForExpectedStorageTypes.get(t);
      if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
        continue; // no target nodes with the required storage type.
      }
      Collections.shuffle(nodesWithStorages);
      for (DatanodeWithStorage.StorageDetails targetNode : nodesWithStorages) {
        DatanodeInfo target = targetNode.getDatanodeInfo();
        if (!excludeNodes.contains(target)
            && matcher.match(dnCacheMgr.getCluster(), source, target)) {
          // Good target with enough space to write the given block size.
          if (targetNode.hasSpaceForScheduling(block.getBlockSize())) {
            targetNode.incScheduledSize(block.getBlockSize());
            return new StorageTypeNodePair(t, target);
          }
          if (LOG.isDebugEnabled()) {
            LOG.debug("Datanode:{} storage type:{} doesn't have sufficient "
                + "space:{} to move the target block size:{}",
                target, t, targetNode, block.getBlockSize());
          }
        }
      }
    }
    return null;
  }

  /**
   * Keeps datanode with its respective storage type.
   */
  static final class StorageTypeNodePair {
    private final StorageType storageType;
    private final DatanodeInfo dn;

    StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
      this.storageType = storageType;
      this.dn = dn;
    }

    public DatanodeInfo getDatanodeInfo() {
      return dn;
    }

    public StorageType getStorageType() {
      return storageType;
    }

    @Override
    public String toString() {
      return new StringBuilder().append("StorageTypeNodePair(\n  ")
          .append("DatanodeInfo: ").append(dn).append(", StorageType: ")
          .append(storageType).toString();
    }
  }

  private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>
      findTargetsForExpectedStorageTypes(List<StorageType> expected,
        DatanodeMap liveDns) {
    EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetsMap =
        new EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>>(
        StorageType.class);

    for (StorageType storageType : expected) {
      List<DatanodeWithStorage> nodes = liveDns.getTarget(storageType);
      if (nodes == null) {
        return targetsMap;
      }
      List<DatanodeWithStorage.StorageDetails> listNodes = targetsMap
          .get(storageType);
      if (listNodes == null) {
        listNodes = new ArrayList<>();
        targetsMap.put(storageType, listNodes);
      }

      for (DatanodeWithStorage n : nodes) {
        final DatanodeWithStorage.StorageDetails node = getMaxRemaining(n,
            storageType);
        if (node != null) {
          listNodes.add(node);
        }
      }
    }
    return targetsMap;
  }

  private static DatanodeWithStorage.StorageDetails getMaxRemaining(
      DatanodeWithStorage node, StorageType storageType) {
    long max = 0L;
    DatanodeWithStorage.StorageDetails nodeInfo = null;
    List<DatanodeWithStorage.StorageDetails> storages = node
        .getNodesWithStorages(storageType);
    for (DatanodeWithStorage.StorageDetails n : storages) {
      if (n.availableSizeToMove() > max) {
        max = n.availableSizeToMove();
        nodeInfo = n;
      }
    }
    return nodeInfo;
  }

  private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
      List<StorageType> existingStorageTypes,
      List<StorageType> expectedStorageTypes, DatanodeMap liveDns) {
    boolean isExpectedTypeAvailable = false;
    boolean isExistingTypeAvailable = false;
    for (DatanodeWithStorage liveDn : liveDns.getTargets()) {
      if (dn.equals(liveDn.datanode)) {
        for (StorageType eachType : liveDn.getStorageTypes()) {
          if (existingStorageTypes.contains(eachType)) {
            isExistingTypeAvailable = true;
          }
          if (expectedStorageTypes.contains(eachType)) {
            isExpectedTypeAvailable = true;
          }
          if (isExistingTypeAvailable && isExpectedTypeAvailable) {
            return true;
          }
        }
      }
    }
    return isExistingTypeAvailable && isExpectedTypeAvailable;
  }

  /**
   * Maintains storage type map with the available datanodes in the cluster.
   */
  public static class DatanodeMap {
    private final EnumMap<StorageType, List<DatanodeWithStorage>> targetsMap =
        new EnumMap<StorageType, List<DatanodeWithStorage>>(StorageType.class);

    private List<DatanodeWithStorage> targets = new ArrayList<>();

    /**
     * Build datanode map with the available storage types.
     *
     * @param node
     *          datanode
     * @param storageTypes
     *          list of available storage types in the given datanode
     * @param maxSize2Move
     *          available space which can be used for scheduling block move
     */
    void addTarget(DatanodeInfo node, List<StorageType> storageTypes,
        List<Long> maxSize2Move) {
      DatanodeWithStorage nodeStorage = new DatanodeWithStorage(node);
      targets.add(nodeStorage);
      for (int i = 0; i < storageTypes.size(); i++) {
        StorageType type = storageTypes.get(i);
        List<DatanodeWithStorage> nodeStorages = targetsMap.get(type);
        nodeStorage.addStorageType(type, maxSize2Move.get(i));
        if (nodeStorages == null) {
          nodeStorages = new LinkedList<>();
          targetsMap.put(type, nodeStorages);
        }
        nodeStorages.add(nodeStorage);
      }
    }

    List<DatanodeWithStorage> getTarget(StorageType storageType) {
      return targetsMap.get(storageType);
    }

    public List<DatanodeWithStorage> getTargets() {
      return targets;
    }

    void reset() {
      targetsMap.clear();
    }
  }

  /**
   * Keeps datanode with its respective set of supported storage types. It holds
   * the available space in each volumes and will be used while pairing the
   * target datanodes.
   */
  public static final class DatanodeWithStorage {
    private final EnumMap<StorageType, List<StorageDetails>> storageMap =
        new EnumMap<StorageType, List<StorageDetails>>(StorageType.class);
    private final DatanodeInfo datanode;

    private DatanodeWithStorage(DatanodeInfo datanode) {
      this.datanode = datanode;
    }

    public DatanodeInfo getDatanodeInfo() {
      return datanode;
    }

    Set<StorageType> getStorageTypes() {
      return storageMap.keySet();
    }

    private void addStorageType(StorageType t, long maxSize2Move) {
      List<StorageDetails> nodesWithStorages = getNodesWithStorages(t);
      if (nodesWithStorages == null) {
        nodesWithStorages = new LinkedList<StorageDetails>();
        storageMap.put(t, nodesWithStorages);
      }
      nodesWithStorages.add(new StorageDetails(maxSize2Move));
    }

    /**
     * Returns datanode storages which has the given storage type.
     *
     * @param type
     *          - storage type
     * @return datanodes for the given storage type
     */
    private List<StorageDetails> getNodesWithStorages(StorageType type) {
      return storageMap.get(type);
    }

    @Override
    public String toString() {
      return new StringBuilder().append("DatanodeWithStorageInfo(\n  ")
          .append("Datanode: ").append(datanode).append(" StorageTypeNodeMap: ")
          .append(storageMap).append(")").toString();
    }

    /** Storage details in a datanode storage type. */
    final class StorageDetails {
      private final long maxSize2Move;
      private long scheduledSize = 0L;

      private StorageDetails(long maxSize2Move) {
        this.maxSize2Move = maxSize2Move;
      }

      private DatanodeInfo getDatanodeInfo() {
        return DatanodeWithStorage.this.datanode;
      }

      /**
       * Checks whether this datanode storage has sufficient space to occupy the
       * given block size.
       */
      private synchronized boolean hasSpaceForScheduling(long size) {
        return availableSizeToMove() > size;
      }

      /**
       * @return the total number of bytes that need to be moved.
       */
      private synchronized long availableSizeToMove() {
        return maxSize2Move - scheduledSize;
      }

      /** Increment scheduled size. */
      private synchronized void incScheduledSize(long size) {
        scheduledSize += size;
      }

      @Override
      public String toString() {
        return new StringBuilder().append("StorageDetails(\n  ")
            .append("maxSize2Move: ").append(maxSize2Move)
            .append(" scheduledSize: ").append(scheduledSize).append(")")
            .toString();
      }
    }
  }

  /**
   * Receives storage movement attempt finished block report.
   *
   * @param dnInfo
   *          reported datanode
   * @param storageType
   *          - storage type
   * @param block
   *          movement attempt finished block.
   */
  @Override
  public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo,
      StorageType storageType, Block block) {
    storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block);
  }

  @VisibleForTesting
  public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
    return storageMovementsMonitor;
  }

  /**
   * Clear the queues from to be storage movement needed lists and items tracked
   * in storage movement monitor.
   */
  public void clearQueues() {
    LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
        + "user requests on satisfying block storages would be discarded.");
    storageMovementNeeded.clearAll();
  }

  /**
   * This class contains information of an attempted blocks and its last
   * attempted or reported time stamp. This is used by
   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
   */
  public final static class AttemptedItemInfo extends ItemInfo {
    private long lastAttemptedOrReportedTime;
    private final Set<Block> blocks;

    /**
     * AttemptedItemInfo constructor.
     *
     * @param rootId
     *          rootId for trackId
     * @param trackId
     *          trackId for file.
     * @param lastAttemptedOrReportedTime
     *          last attempted or reported time
     * @param blocks
     *          scheduled blocks
     * @param retryCount
     *          file retry count
     */
    public AttemptedItemInfo(long rootId, long trackId,
        long lastAttemptedOrReportedTime,
        Set<Block> blocks, int retryCount) {
      super(rootId, trackId, retryCount);
      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
      this.blocks = blocks;
    }

    /**
     * @return last attempted or reported time stamp.
     */
    long getLastAttemptedOrReportedTime() {
      return lastAttemptedOrReportedTime;
    }

    /**
     * Update lastAttemptedOrReportedTime, so that the expiration time will be
     * postponed to future.
     */
    void touchLastReportedTimeStamp() {
      this.lastAttemptedOrReportedTime = monotonicNow();
    }

    Set<Block> getBlocks() {
      return this.blocks;
    }
  }

  @Override
  public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
    storageMovementNeeded.add(trackInfo, scanCompleted);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Added track info for inode {} to block "
          + "storageMovementNeeded queue", trackInfo.getFile());
    }
  }

  @Override
  public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList,
      boolean scanCompleted) {
    getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
  }

  @Override
  public int processingQueueSize() {
    return storageMovementNeeded.size();
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @VisibleForTesting
  public BlockStorageMovementNeeded getStorageMovementQueue() {
    return storageMovementNeeded;
  }

  @Override
  public void markScanCompletedForPath(long inodeId) {
    getStorageMovementQueue().markScanCompletedForDir(inodeId);
  }

  /**
   * Join main SPS thread.
   */
  public void join() throws InterruptedException {
    storagePolicySatisfierThread.join();
  }

  /**
   * Remove the overlap between the expected types and the existing types.
   *
   * @param expected
   *          - Expected storage types list.
   * @param existing
   *          - Existing storage types list.
   * @param ignoreNonMovable
   *          ignore non-movable storage types by removing them from both
   *          expected and existing storage type list to prevent non-movable
   *          storage from being moved.
   * @returns if the existing types or the expected types is empty after
   *          removing the overlap.
   */
  private static boolean removeOverlapBetweenStorageTypes(
      List<StorageType> expected,
      List<StorageType> existing, boolean ignoreNonMovable) {
    for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
      final StorageType t = i.next();
      if (expected.remove(t)) {
        i.remove();
      }
    }
    if (ignoreNonMovable) {
      removeNonMovable(existing);
      removeNonMovable(expected);
    }
    return expected.isEmpty() || existing.isEmpty();
  }

  private static void removeNonMovable(List<StorageType> types) {
    for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
      final StorageType t = i.next();
      if (!t.isMovable()) {
        i.remove();
      }
    }
  }

  /**
   * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
   * configuration.
   *
   * @param conf Configuration
   * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
   */
  private static int getSPSWorkMultiplier(Configuration conf) {
    int spsWorkMultiplier = conf
        .getInt(
            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
    Preconditions.checkArgument(
        (spsWorkMultiplier > 0),
        DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
        " = '" + spsWorkMultiplier + "' is invalid. " +
        "It should be a positive, non-zero integer value.");
    return spsWorkMultiplier;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockMoveTaskHandler 源码

hadoop BlockMovementListener 源码

hadoop BlockStorageMovementAttemptedItems 源码

hadoop BlockStorageMovementNeeded 源码

hadoop Context 源码

hadoop DatanodeCacheManager 源码

hadoop FileCollector 源码

hadoop ItemInfo 源码

hadoop SPSService 源码

hadoop StoragePolicySatisfyManager 源码

0  赞