hadoop DatanodeAdminBackoffMonitor 源码

  • 2022-10-20
  • 浏览 (183)

haddop DatanodeAdminBackoffMonitor 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Map;
import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.stream.Collectors;

/**
 * This class implements the logic to track decommissioning and entering
 * maintenance nodes, ensure all their blocks are adequately replicated
 * before they are moved to the decommissioned or maintenance state.
 *
 * This monitor avoids flooding the replication queue with all pending blocks
 * and instead feeds them to the queue as the prior set complete replication.
 *
 * HDFS-14854 contains details about the overall design of this class.
 *
 */
public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
    implements DatanodeAdminMonitorInterface  {
  /**
   * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
   * datanodes that are being tracked so they can be be marked as
   * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
   * IN_MAINTENANCE, the node remains in the map until
   * maintenance expires checked during a monitor tick.
   * <p/>
   * This holds a set of references to the under-replicated blocks on the DN
   * at the time the DN is added to the map, i.e. the blocks that are
   * preventing the node from being marked as decommissioned. During a monitor
   * tick, this list is pruned as blocks becomes replicated.
   * <p/>
   * Note also that the reference to the list of under-replicated blocks
   * will be null on initial add
   * <p/>
   * However, this map can become out-of-date since it is not updated by block
   * reports or other events. Before being finally marking as decommissioned,
   * another check is done with the actual block map.
   */
  private HashMap<DatanodeDescriptor, HashMap<BlockInfo, Integer>>
      outOfServiceNodeBlocks = new HashMap<>();

  /**
   * The numbe of blocks to process when moving blocks to pendingReplication
   * before releasing and reclaiming the namenode lock.
   */
  private int blocksPerLock;

  /**
   * The number of blocks that have been checked on this tick.
   */
  private int numBlocksChecked = 0;
  /**
   * The maximum number of blocks to hold in PendingRep at any time.
   */
  private int pendingRepLimit;

  /**
   * The list of blocks which have been placed onto the replication queue
   * and are waiting to be sufficiently replicated.
   */
  private final Map<DatanodeDescriptor, List<BlockInfo>>
      pendingRep = new HashMap<>();

  private static final Logger LOG =
      LoggerFactory.getLogger(DatanodeAdminBackoffMonitor.class);

  DatanodeAdminBackoffMonitor() {
  }


  @Override
  protected void processConf() {
    this.pendingRepLimit = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
        DFSConfigKeys.
            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT);
    if (this.pendingRepLimit < 1) {
      LOG.error("{} is set to an invalid value, it must be greater than "+
              "zero. Defaulting to {}",
          DFSConfigKeys.
              DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
          DFSConfigKeys.
              DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT
      );
      this.pendingRepLimit = DFSConfigKeys.
          DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT;
    }
    this.blocksPerLock = conf.getInt(
        DFSConfigKeys.
            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
        DFSConfigKeys.
            DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT
    );
    if (blocksPerLock <= 0) {
      LOG.error("{} is set to an invalid value, it must be greater than "+
              "zero. Defaulting to {}",
          DFSConfigKeys.
              DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
          DFSConfigKeys.
              DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT);
      blocksPerLock =
          DFSConfigKeys.
              DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT;
    }
    LOG.info("Initialized the Backoff Decommission and Maintenance Monitor");
  }

  /**
   * Queue a node to be removed from tracking. This method must be called
   * under the namenode write lock.
   * @param dn The datanode to stop tracking for decommission.
   */
  @Override
  public void stopTrackingNode(DatanodeDescriptor dn) {
    getPendingNodes().remove(dn);
    getCancelledNodes().add(dn);
  }

  @Override
  public int getTrackedNodeCount() {
    return outOfServiceNodeBlocks.size();
  }

  @Override
  public int getNumNodesChecked() {
    // We always check all nodes on each tick
    return outOfServiceNodeBlocks.size();
  }

  @Override
  public void run() {
    LOG.debug("DatanodeAdminMonitorV2 is running.");
    if (!namesystem.isRunning()) {
      LOG.info("Namesystem is not running, skipping " +
          "decommissioning/maintenance checks.");
      return;
    }
    // Reset the checked count at beginning of each iteration
    numBlocksChecked = 0;
    // Check decommission or maintenance progress.
    try {
      namesystem.writeLock();
      try {
        /**
         * Other threads can modify the pendingNode list and the cancelled
         * node list, so we must process them under the NN write lock to
         * prevent any concurrent modifications.
         *
         * Always process the cancelled list before the pending list, as
         * it is possible for a node to be cancelled, and then quickly added
         * back again. If we process these the other way around, the added
         * node will be removed from tracking by the pending cancel.
         */
        processCancelledNodes();

        // Having more nodes decommissioning than can be tracked will impact decommissioning
        // performance due to queueing delay
        int numTrackedNodes = outOfServiceNodeBlocks.size();
        int numQueuedNodes = getPendingNodes().size();
        int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
        if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
          LOG.warn(
              "{} nodes are decommissioning but only {} nodes will be tracked at a time. "
                  + "{} nodes are currently queued waiting to be decommissioned.",
              numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);

          // Re-queue unhealthy nodes to make space for decommissioning healthy nodes
          final List<DatanodeDescriptor> unhealthyDns = outOfServiceNodeBlocks.keySet().stream()
              .filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn))
              .collect(Collectors.toList());
          getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
            getPendingNodes().add(dn);
            outOfServiceNodeBlocks.remove(dn);
            pendingRep.remove(dn);
          });
        }

        processPendingNodes();
      } finally {
        namesystem.writeUnlock("DatanodeAdminMonitorV2Thread");
      }
      // After processing the above, various parts of the check() method will
      // take and drop the read / write lock as needed. Aside from the
      // cancelled and pending lists, nothing outside of the monitor thread
      // modifies anything inside this class, so many things can be done
      // without any lock.
      check();
    } catch (Exception e) {
      LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
          e);
    }
    if (numBlocksChecked + outOfServiceNodeBlocks.size() > 0) {
      LOG.info("Checked {} blocks this tick. {} nodes are now " +
          "in maintenance or transitioning state. {} nodes pending. {} " +
          "nodes waiting to be cancelled.",
          numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
          getCancelledNodes().size());
    }
  }

  /**
   * Move any pending nodes into outOfServiceNodeBlocks to initiate the
   * decommission or maintenance mode process.
   *
   * This method must be executed under the namenode write lock to prevent
   * the pendingNodes list from being modified externally.
   */
  private void processPendingNodes() {
    while (!getPendingNodes().isEmpty() &&
        (maxConcurrentTrackedNodes == 0 ||
            outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
      outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
    }
  }

  /**
   * Process any nodes which have had their decommission or maintenance mode
   * cancelled by an administrator.
   *
   * This method must be executed under the
   * write lock to prevent the cancelledNodes list being modified externally.
   */
  private void processCancelledNodes() {
    while(!getCancelledNodes().isEmpty()) {
      DatanodeDescriptor dn = getCancelledNodes().poll();
      outOfServiceNodeBlocks.remove(dn);
      pendingRep.remove(dn);
    }
  }

  /**
   * This method performs each of the steps to track a node from
   * decommissioning or entering maintenance to the end state.
   *
   * First, any newly added nodes are scanned.
   *
   * Then any expired maintenance nodes are handled.
   *
   * Next the pendingRep map is scanned and all blocks which are now
   * sufficiently replicated are removed
   *
   * Then new blocks are moved to pendingRep
   *
   * Finally we check if any nodes have completed the replication process and
   * if so move them to their final states.
   *
   * This methods which this method calls will take and release the namenode
   * read and write lock several times.
   *
   */
  private void check() {
    final List<DatanodeDescriptor> toRemove = new ArrayList<>();

    if (outOfServiceNodeBlocks.size() == 0) {
      // No nodes currently being tracked so simply return
      return;
    }

    // Check if there are any pending nodes to process, ie those where the
    // storage has not been scanned yet. For all which are pending, scan
    // the storage and load the under-replicated block list into
    // outOfServiceNodeBlocks. As this does not modify any external structures
    // it can be done under the namenode *read* lock, and the lock can be
    // dropped between each storage on each node.
    //
    // TODO - This is an expensive call, depending on how many nodes are
    //        to be processed, but it requires only the read lock and it will
    //        be dropped and re-taken frequently. We may want to throttle this
    //        to process only a few nodes per iteration.
    outOfServiceNodeBlocks.keySet()
        .stream()
        .filter(n -> outOfServiceNodeBlocks.get(n) == null)
        .forEach(n -> scanDatanodeStorage(n, true));

    processMaintenanceNodes();
    // First check the pending replication list and remove any blocks
    // which are now replicated OK. This list is constrained in size so this
    // call should not be overly expensive.
    processPendingReplication();

    // Now move a limited number of blocks to pending
    moveBlocksToPending();

    // Check if any nodes have reached zero blocks and also update the stats
    // exposed via JMX for all nodes still being processed.
    checkForCompletedNodes(toRemove);

    // Finally move the nodes to their final state if they are ready.
    processCompletedNodes(toRemove);
  }

  /**
   * Checks for any nodes which are in maintenance and if maintenance has
   * expired, the node will be moved back to in_service (or dead) as required.
   */
  private void processMaintenanceNodes() {
    // Check for any maintenance state nodes which need to be expired
    namesystem.writeLock();
    try {
      for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) {
        if (dn.isMaintenance() && dn.maintenanceExpired()) {
          // If maintenance expires, stop tracking it. This can be an
          // expensive call, as it may need to invalidate blocks. Therefore
          // we can yield and retake the write lock after each node
          //
          // The call to stopMaintenance makes a call to stopTrackingNode()
          // which added the node to the cancelled list. Therefore expired
          // maintenance nodes do not need to be added to the toRemove list.
          dnAdmin.stopMaintenance(dn);
          namesystem.writeUnlock("processMaintenanceNodes");
          namesystem.writeLock();
        }
      }
    } finally {
      namesystem.writeUnlock("processMaintenanceNodes");
    }
  }

  /**
   * Loop over all nodes in the passed toRemove list and move the node to
   * the required end state. This will also remove any entries from
   * outOfServiceNodeBlocks and pendingRep for the node if required.
   *
   * @param toRemove The list of nodes to process for completion.
   */
  private void processCompletedNodes(List<DatanodeDescriptor> toRemove) {
    if (toRemove.size() == 0) {
      // If there are no nodes to process simply return and avoid
      // taking the write lock at all.
      return;
    }
    namesystem.writeLock();
    try {
      for (DatanodeDescriptor dn : toRemove) {
        final boolean isHealthy =
            blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
        if (isHealthy) {
          if (dn.isDecommissionInProgress()) {
            dnAdmin.setDecommissioned(dn);
            outOfServiceNodeBlocks.remove(dn);
            pendingRep.remove(dn);
          } else if (dn.isEnteringMaintenance()) {
            // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
            // to track maintenance expiration.
            dnAdmin.setInMaintenance(dn);
            pendingRep.remove(dn);
          } else if (dn.isInService()) {
            // Decom / maint was cancelled and the node is yet to be processed
            // from cancelledNodes
            LOG.info("Node {} completed decommission and maintenance " +
                "but has been moved back to in service", dn);
            pendingRep.remove(dn);
            outOfServiceNodeBlocks.remove(dn);
            continue;
          } else {
            // Should not happen
            LOG.error("Node {} is in an unexpected state {} and has been "+
                    "removed from tracking for decommission or maintenance",
                dn, dn.getAdminState());
            pendingRep.remove(dn);
            outOfServiceNodeBlocks.remove(dn);
            continue;
          }
          LOG.info("Node {} is sufficiently replicated and healthy, "
              + "marked as {}.", dn, dn.getAdminState());
        } else {
          LOG.info("Node {} isn't healthy."
                  + " It needs to replicate {} more blocks."
                  + " {} is still in progress.", dn,
              getPendingCountForNode(dn), dn.getAdminState());
        }
      }
    } finally {
      namesystem.writeUnlock("processCompletedNodes");
    }
  }

  /**
   * Loop over all nodes and check for any which have zero unprocessed or
   * pending blocks. If the node has zero blocks pending, the storage is
   * rescanned to ensure no transient blocks were missed on the first pass.
   *
   * If, after rescan the number of blocks pending replication is zero, the
   * node is added to the passed removeList which will later be processed to
   * complete the decommission or entering maintenance process.
   *
   * @param removeList Nodes which have zero pending blocks are added to this
   *                   list.
   */
  private void checkForCompletedNodes(List<DatanodeDescriptor> removeList) {
    for (DatanodeDescriptor dn : outOfServiceNodeBlocks.keySet()) {
      // If the node is already in maintenance, we don't need to perform
      // any further checks on it.
      if (dn.isInMaintenance()) {
        LOG.debug("Node {} is currently in maintenance", dn);
        continue;
      } else if (!dn.isInService()) {
        // A node could be inService if decom or maint has been cancelled, but
        // the cancelled list is yet to be processed. We don't need to check
        // inService nodes here
        int outstandingBlocks = getPendingCountForNode(dn);
        if (outstandingBlocks == 0) {
          scanDatanodeStorage(dn, false);
          outstandingBlocks = getPendingCountForNode(dn);
        }
        LOG.info("Node {} has {} blocks yet to process", dn, outstandingBlocks);
        if (outstandingBlocks == 0) {
          removeList.add(dn);
        }
      }
    }
  }

  /**
   * Returns the number of block pending for the given node by adding those
   * blocks in pendingRep and outOfServiceNodeBlocks.
   *
   * @param dn The datanode to return the count for
   * @return The total block count, or zero if none are pending
   */
  private int getPendingCountForNode(DatanodeDescriptor dn) {
    int count = 0;
    HashMap<BlockInfo, Integer> blocks = outOfServiceNodeBlocks.get(dn);
    if (blocks != null) {
      count += blocks.size();
    }
    List<BlockInfo> pendingBlocks = pendingRep.get(dn);
    if (pendingBlocks != null) {
      count += pendingBlocks.size();
    }
    return count;
  }

  /**
   * Iterate across all nodes in outOfServiceNodeBlocks which have blocks yet
   * to be processed.
   *
   * The block is removed from outOfServiceNodeBlocks and if it needs
   * replication it is added to the pendingRep map and also to the
   * BlockManager replication queue.
   *
   * Any block that does not need replication is discarded.
   *
   * The method will return when there are the pendingRep map has
   * pendingRepLimit blocks or there are no further blocks to process.
   */
  private void moveBlocksToPending() {
    int blocksProcessed = 0;
    int pendingCount = getPendingCount();
    int yetToBeProcessed = getYetToBeProcessedCount();

    if (pendingCount == 0 && yetToBeProcessed == 0) {
      // There are no blocks to process so just return
      LOG.debug("There are no pending or blocks yet to be processed");
      return;
    }

    namesystem.writeLock();
    try {
      long repQueueSize = blockManager.getLowRedundancyBlocksCount();

      LOG.info("There are {} blocks pending replication and the limit is "+
          "{}. A further {} blocks are waiting to be processed. "+
          "The replication queue currently has {} blocks",
          pendingCount, pendingRepLimit, yetToBeProcessed, repQueueSize);

      if (pendingCount >= pendingRepLimit) {
        // Only add more blocks to the replication queue if we don't already
        // have too many pending
        return;
      }

      // Create a "Block Iterator" for each node decommissioning or entering
      // maintenance. These iterators will be used "round robined" to add blocks
      // to the replication queue and PendingRep
      HashMap<DatanodeDescriptor, Iterator<BlockInfo>>
          iterators = new HashMap<>();
      for (Map.Entry<DatanodeDescriptor, HashMap<BlockInfo, Integer>> e
          : outOfServiceNodeBlocks.entrySet()) {
        iterators.put(e.getKey(), e.getValue().keySet().iterator());
      }

      // Now loop until we fill the pendingRep map with pendingRepLimit blocks
      // or run out of blocks to add.
      Iterator<DatanodeDescriptor> nodeIter =
          Iterables.cycle(iterators.keySet()).iterator();
      while (nodeIter.hasNext()) {
        // Cycle through each node with blocks which still need processed
        DatanodeDescriptor dn = nodeIter.next();
        Iterator<BlockInfo> blockIt = iterators.get(dn);
        while (blockIt.hasNext()) {
          // Process the blocks for the node until we find one that needs
          // replication
          if (blocksProcessed >= blocksPerLock) {
            blocksProcessed = 0;
            namesystem.writeUnlock("moveBlocksToPending");
            namesystem.writeLock();
          }
          blocksProcessed++;
          if (nextBlockAddedToPending(blockIt, dn)) {
            // Exit the inner "block" loop so an iterator for the next datanode
            // is used for the next block.
            pendingCount++;
            break;
          }
        }
        if (!blockIt.hasNext()) {
          // remove the iterator as there are no blocks left in it
          nodeIter.remove();
        }
        if (pendingCount >= pendingRepLimit) {
          // We have scheduled the limit of blocks for replication, so do
          // not add any more
          break;
        }
      }
    } finally {
      namesystem.writeUnlock("moveBlocksToPending");
    }
    LOG.debug("{} blocks are now pending replication", pendingCount);
  }

  /**
   * Takes and removes the next block from the given iterator and checks if it
   * needs additional replicas. If it does, it will be scheduled for
   * reconstruction and added to the pendingRep map.
   * @param it The iterator to take the next block from
   * @param dn The datanodeDescriptor the iterator applies to
   * @return True if the block needs replication, otherwise false
   */
  private boolean nextBlockAddedToPending(Iterator<BlockInfo> it,
      DatanodeDescriptor dn) {
    BlockInfo block = it.next();
    it.remove();
    numBlocksChecked++;
    if (!isBlockReplicatedOk(dn, block, true, null)) {
      pendingRep.computeIfAbsent(dn, k -> new LinkedList<>()).add(block);
      return true;
    }
    return false;
  }

  private int getPendingCount() {
    if (pendingRep.size() == 0) {
      return 0;
    }
    return pendingRep.values()
        .stream()
        .map(a -> a.size())
        .reduce(0, (a, b) -> a + b);
  }

  private int getYetToBeProcessedCount() {
    if (outOfServiceNodeBlocks.size() == 0) {
      return 0;
    }
    return outOfServiceNodeBlocks.values()
        .stream()
        .map(a -> a.size())
        .reduce(0, (a, b) -> a + b);
  }

  /**
   * Scan all the blocks held on a datanodes. For a node being decommissioned
   * we assume that the majority of blocks on the node will need to have new
   * replicas made, and therefore we do not check if they are under replicated
   * here and instead add them to the list of blocks to track.
   *
   * For a node being moved into maintenance, we assume most blocks will be
   * replicated OK and hence we do check their under-replicated status here,
   * hopefully reducing the number of blocks to track.
   *
   * On a re-scan (initalScan = false) we assume the node has been processed
   * already, and hence there should be few under-replicated blocks, so we
   * check the under-replicated status before adding the blocks to the
   * tracking list.
   *
   * This means that for a node being decomission there should be a large
   * number of blocks to process later but for maintenance, a smaller number.
   *
   * As this method does not schedule any blocks for reconstuction, this
   * scan can be performed under the namenode readlock, and the lock is
   * dropped and reaquired for each storage on the DN.
   *
   * @param dn - The datanode to process
   * @param initialScan - True is this is the first time scanning the node
   *                    or false if it is a rescan.
   */
  private void scanDatanodeStorage(DatanodeDescriptor dn,
                                   Boolean initialScan) {
    HashMap<BlockInfo, Integer> blockList = outOfServiceNodeBlocks.get(dn);
    if (blockList == null) {
      blockList = new HashMap<>();
      outOfServiceNodeBlocks.put(dn, blockList);
    }

    DatanodeStorageInfo[] storage;
    namesystem.readLock();
    try {
      storage = dn.getStorageInfos();
    } finally {
      namesystem.readUnlock("scanDatanodeStorage");
    }

    for (DatanodeStorageInfo s : storage) {
      namesystem.readLock();
      try {
        // As the lock is dropped and re-taken between each storage, we need
        // to check the storage is still present before processing it, as it
        // may have been removed.
        if (dn.getStorageInfo(s.getStorageID()) == null) {
          continue;
        }
        Iterator<BlockInfo> it = s.getBlockIterator();
        while (it.hasNext()) {
          BlockInfo b = it.next();
          if (!initialScan || dn.isEnteringMaintenance()) {
            // this is a rescan, so most blocks should be replicated now,
            // or this node is going into maintenance. On a healthy
            // cluster using racks or upgrade domain, a node should be
            // able to go into maintenance without replicating many blocks
            // so we will check them immediately.
            if (!isBlockReplicatedOk(dn, b, false, null)) {
              blockList.put(b, null);
            }
          } else {
            blockList.put(b, null);
          }
          numBlocksChecked++;
        }
      } finally {
        namesystem.readUnlock("scanDatanodeStorage");
      }
    }
  }

  /**
   * Process the list of pendingReplication Blocks. These are the blocks
   * which have been moved from outOfServiceNodeBlocks, confirmed to be
   * under-replicated and were added to the blockManager replication
   * queue.
   *
   * Any blocks which have been confirmed to be replicated sufficiently are
   * removed from the list.
   *
   * The datanode stats are also updated in this method, updating the total
   * pending block count, the number of blocks in PendingRep which are in
   * open files and the number of blocks in PendingRep which are only on
   * out of service nodes.
   *
   * As this method makes changes to the replication queue, it acquires the
   * namenode write lock while it runs.
   */
  private void processPendingReplication() {
    namesystem.writeLock();
    try {
      for (Iterator<Map.Entry<DatanodeDescriptor, List<BlockInfo>>>
           entIt = pendingRep.entrySet().iterator(); entIt.hasNext();) {
        Map.Entry<DatanodeDescriptor, List<BlockInfo>> entry = entIt.next();
        DatanodeDescriptor dn = entry.getKey();
        List<BlockInfo> blocks = entry.getValue();
        if (blocks == null) {
          // should not be able to happen
          entIt.remove();
          continue;
        }
        Iterator<BlockInfo> blockIt =  blocks.iterator();
        BlockStats suspectBlocks = new BlockStats();
        while(blockIt.hasNext()) {
          BlockInfo b = blockIt.next();
          if (isBlockReplicatedOk(dn, b, true, suspectBlocks)) {
            blockIt.remove();
          }
          numBlocksChecked++;
        }
        if (blocks.size() == 0) {
          entIt.remove();
        }
        // Update metrics for this datanode.
        dn.getLeavingServiceStatus().set(
            suspectBlocks.getOpenFileCount(),
            suspectBlocks.getOpenFiles(),
            getPendingCountForNode(dn),
            suspectBlocks.getOutOfServiceBlockCount());
      }
    } finally {
      namesystem.writeUnlock("processPendingReplication");
    }
  }

  /**
   * Checks if a block is sufficiently replicated and optionally schedules
   * it for reconstruction if it is not.
   *
   * If a BlockStats object is passed, this method will also update it if the
   * block is part of an open file or only on outOfService nodes.
   *
   * @param datanode The datanode the block belongs to
   * @param block The block to check
   * @param scheduleReconStruction Whether to add the block to the replication
   *                               queue if it is not sufficiently replicated.
   *                               Passing true will add it to the replication
   *                               queue, and false will not.
   * @param suspectBlocks If non-null check if the block is part of an open
   *                      file or only on out of service nodes and update the
   *                      passed object accordingly.
   * @return
   */
  private boolean isBlockReplicatedOk(DatanodeDescriptor datanode,
      BlockInfo block, Boolean scheduleReconStruction,
      BlockStats suspectBlocks) {
    if (blockManager.blocksMap.getStoredBlock(block) == null) {
      LOG.trace("Removing unknown block {}", block);
      return true;
    }

    long bcId = block.getBlockCollectionId();
    if (bcId == INodeId.INVALID_INODE_ID) {
      // Orphan block, will be invalidated eventually. Skip.
      return false;
    }

    final BlockCollection bc = blockManager.getBlockCollection(block);
    final NumberReplicas num = blockManager.countNodes(block);
    final int liveReplicas = num.liveReplicas();

    // Schedule low redundancy blocks for reconstruction
    // if not already pending.
    boolean isDecommission = datanode.isDecommissionInProgress();
    boolean isMaintenance = datanode.isEnteringMaintenance();

    boolean neededReconstruction = isDecommission ?
        blockManager.isNeededReconstruction(block, num) :
        blockManager.isNeededReconstructionForMaintenance(block, num);
    if (neededReconstruction && scheduleReconStruction) {
      if (!blockManager.neededReconstruction.contains(block) &&
          blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
          blockManager.isPopulatingReplQueues()) {
        // Process these blocks only when active NN is out of safe mode.
        blockManager.neededReconstruction.add(block,
            liveReplicas, num.readOnlyReplicas(),
            num.outOfServiceReplicas(),
            blockManager.getExpectedRedundancyNum(block));
      }
    }

    if (suspectBlocks != null) {
      // Only if we pass a BlockStats object should we do these
      // checks, as they should only be checked when processing PendingRep.
      if (bc.isUnderConstruction()) {
        INode ucFile = namesystem.getFSDirectory().getInode(bc.getId());
        if (!(ucFile instanceof INodeFile) ||
            !ucFile.asFile().isUnderConstruction()) {
          LOG.warn("File {} is not under construction. Skipping add to " +
              "low redundancy open files!", ucFile.getLocalName());
        } else {
          suspectBlocks.addOpenFile(ucFile.getId());
        }
      }
      if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
        suspectBlocks.incrementOutOfServiceBlocks();
      }
    }

    // Even if the block is without sufficient redundancy,
    // it might not block decommission/maintenance if it
    // has sufficient redundancy.
    if (dnAdmin.isSufficient(block, bc, num, isDecommission, isMaintenance)) {
      return true;
    }
    return false;
  }

  static class BlockStats {
    private LightWeightHashSet<Long> openFiles =
        new LightWeightLinkedSet<>();
    private int openFileBlockCount = 0;
    private int outOfServiceBlockCount = 0;

    public void addOpenFile(long id) {
      // Several blocks can be part of the same file so track how
      // many adds we get, as the same file could be added several times
      // for different blocks.
      openFileBlockCount++;
      openFiles.add(id);
    }

    public void incrementOutOfServiceBlocks() {
      outOfServiceBlockCount++;
    }

    public LightWeightHashSet<Long> getOpenFiles() {
      return openFiles;
    }

    public int getOpenFileCount() {
      return openFileBlockCount;
    }

    public int getOutOfServiceBlockCount() {
      return outOfServiceBlockCount;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvailableSpaceBlockPlacementPolicy 源码

hadoop AvailableSpaceRackFaultTolerantBlockPlacementPolicy 源码

hadoop BlockCollection 源码

hadoop BlockIdManager 源码

hadoop BlockInfo 源码

hadoop BlockInfoContiguous 源码

hadoop BlockInfoStriped 源码

hadoop BlockManager 源码

hadoop BlockManagerFaultInjector 源码

hadoop BlockManagerSafeMode 源码

0  赞