hadoop NetworkTopology 源码

  • 2022-10-20
  • 浏览 (196)

haddop NetworkTopology 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.net;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

/** The class represents a cluster of computer with a tree hierarchical
 * network topology.
 * For example, a cluster may be consists of many data centers filled 
 * with racks of computers.
 * In a network topology, leaves represent data nodes (computers) and inner
 * nodes represent switches/routers that manage traffic in/out of data centers
 * or racks.  
 * 
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class NetworkTopology {
  public final static String DEFAULT_RACK = "/default-rack";
  public static final Logger LOG =
      LoggerFactory.getLogger(NetworkTopology.class);

  private static final char PATH_SEPARATOR = '/';
  private static final String PATH_SEPARATOR_STR = "/";
  private static final String ROOT = "/";
  private static final AtomicReference<Random> RANDOM_REF =
      new AtomicReference<>();

  public static class InvalidTopologyException extends RuntimeException {
    private static final long serialVersionUID = 1L;
    public InvalidTopologyException(String msg) {
      super(msg);
    }
  }
  
  /**
   * Get an instance of NetworkTopology based on the value of the configuration
   * parameter net.topology.impl.
   * 
   * @param conf the configuration to be used
   * @return an instance of NetworkTopology
   */
  public static NetworkTopology getInstance(Configuration conf){
    return getInstance(conf, InnerNodeImpl.FACTORY);
  }

  public static NetworkTopology getInstance(Configuration conf,
      InnerNode.Factory factory) {
    NetworkTopology nt = ReflectionUtils.newInstance(
        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
            NetworkTopology.class, NetworkTopology.class), conf);
    return nt.init(factory);
  }

  protected NetworkTopology init(InnerNode.Factory factory) {
    if (!factory.equals(this.factory)) {
      // the constructor has initialized the factory to default. So only init
      // again if another factory is specified.
      this.factory = factory;
      this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
    }
    return this;
  }

  InnerNode.Factory factory;
  /**
   * the root cluster map
   */
  InnerNode clusterMap;
  /** Depth of all leaf nodes */
  private int depthOfAllLeaves = -1;
  /** rack counter */
  protected int numOfRacks = 0;
  /** empty rack map, rackname->nodenumber. */
  private HashMap<String, Set<String>> rackMap =
      new HashMap<String, Set<String>>();
  /** decommission nodes, contained stoped nodes. */
  private HashSet<String> decommissionNodes = new HashSet<>();
  /** empty rack counter. */
  private int numOfEmptyRacks = 0;

  /**
   * Whether or not this cluster has ever consisted of more than 1 rack,
   * according to the NetworkTopology.
   */
  private boolean clusterEverBeenMultiRack = false;

  /** the lock used to manage access */
  protected ReadWriteLock netlock = new ReentrantReadWriteLock(true);

  // keeping the constructor because other components like MR still uses this.
  public NetworkTopology() {
    this.factory = InnerNodeImpl.FACTORY;
    this.clusterMap = factory.newInnerNode(NodeBase.ROOT);
  }

  /** Add a leaf node
   * Update node counter &amp; rack counter if necessary
   * @param node node to be added; can be null
   * @exception IllegalArgumentException if add a node to a leave 
                                         or node to be added is not a leaf
   */
  public void add(Node node) {
    if (node==null) return;
    int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
    netlock.writeLock().lock();
    try {
      if( node instanceof InnerNode ) {
        throw new IllegalArgumentException(
          "Not allow to add an inner node: "+NodeBase.getPath(node));
      }
      if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
        LOG.error("Error: can't add leaf node {} at depth {} to topology:{}\n",
            NodeBase.getPath(node), newDepth, this);
        throw new InvalidTopologyException("Failed to add " + NodeBase.getPath(node) +
            ": You cannot have a rack and a non-rack node at the same " +
            "level of the network topology.");
      }
      Node rack = getNodeForNetworkLocation(node);
      if (rack != null && !(rack instanceof InnerNode)) {
        throw new IllegalArgumentException("Unexpected data node " 
                                           + node.toString() 
                                           + " at an illegal network location");
      }
      if (clusterMap.add(node)) {
        LOG.info("Adding a new node: "+NodeBase.getPath(node));
        if (rack == null) {
          incrementRacks();
        }
        interAddNodeWithEmptyRack(node);
        if (depthOfAllLeaves == -1) {
          depthOfAllLeaves = node.getLevel();
        }
      }
      LOG.debug("NetworkTopology became:\n{}", this);
    } finally {
      netlock.writeLock().unlock();
    }
  }

  protected void incrementRacks() {
    numOfRacks++;
    if (!clusterEverBeenMultiRack && numOfRacks > 1) {
      clusterEverBeenMultiRack = true;
    }
  }

  /**
   * Return a reference to the node given its string representation.
   * Default implementation delegates to {@link #getNode(String)}.
   * 
   * <p>To be overridden in subclasses for specific NetworkTopology 
   * implementations, as alternative to overriding the full {@link #add(Node)}
   *  method.
   * 
   * @param node The string representation of this node's network location is
   * used to retrieve a Node object. 
   * @return a reference to the node; null if the node is not in the tree
   * 
   * @see #add(Node)
   * @see #getNode(String)
   */
  protected Node getNodeForNetworkLocation(Node node) {
    return getNode(node.getNetworkLocation());
  }
  
  /**
   * Given a string representation of a rack, return its children
   * @param loc a path-like string representation of a rack
   * @return a newly allocated list with all the node's children
   */
  public List<Node> getDatanodesInRack(String loc) {
    netlock.readLock().lock();
    try {
      loc = NodeBase.normalize(loc);
      if (!NodeBase.ROOT.equals(loc)) {
        loc = loc.substring(1);
      }
      InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
      return (rack == null) ? new ArrayList<>(0)
          : new ArrayList<>(rack.getChildren());
    } finally {
      netlock.readLock().unlock();
    }
  }

  /** Remove a node
   * Update node counter and rack counter if necessary
   * @param node node to be removed; can be null
   */ 
  public void remove(Node node) {
    if (node==null) return;
    if( node instanceof InnerNode ) {
      throw new IllegalArgumentException(
        "Not allow to remove an inner node: "+NodeBase.getPath(node));
    }
    LOG.info("Removing a node: "+NodeBase.getPath(node));
    netlock.writeLock().lock();
    try {
      if (clusterMap.remove(node)) {
        InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
        if (rack == null) {
          numOfRacks--;
        }
        interRemoveNodeWithEmptyRack(node);
      }
      LOG.debug("NetworkTopology became:\n{}", this);
    } finally {
      netlock.writeLock().unlock();
    }
  }

  /** Check if the tree contains node <i>node</i>
   * 
   * @param node a node
   * @return true if <i>node</i> is already in the tree; false otherwise
   */
  public boolean contains(Node node) {
    if (node == null) return false;
    netlock.readLock().lock();
    try {
      Node parent = node.getParent();
      for (int level = node.getLevel(); parent != null && level > 0;
           parent = parent.getParent(), level--) {
        if (parent == clusterMap) {
          return true;
        }
      }
    } finally {
      netlock.readLock().unlock();
    }
    return false; 
  }
    
  /** Given a string representation of a node, return its reference
   * 
   * @param loc
   *          a path-like string representation of a node
   * @return a reference to the node; null if the node is not in the tree
   */
  public Node getNode(String loc) {
    netlock.readLock().lock();
    try {
      loc = NodeBase.normalize(loc);
      if (!NodeBase.ROOT.equals(loc))
        loc = loc.substring(1);
      return clusterMap.getLoc(loc);
    } finally {
      netlock.readLock().unlock();
    }
  }

  /**
   * @return true if this cluster has ever consisted of multiple racks, even if
   *         it is not now a multi-rack cluster.
   */
  public boolean hasClusterEverBeenMultiRack() {
    return clusterEverBeenMultiRack;
  }

  /** Given a string representation of a rack for a specific network
   *  location
   *
   * To be overridden in subclasses for specific NetworkTopology 
   * implementations, as alternative to overriding the full 
   * {@link #getRack(String)} method.
   * @param loc
   *          a path-like string representation of a network location
   * @return a rack string
   */
  public String getRack(String loc) {
    return loc;
  }
  
  /** @return the total number of racks */
  public int getNumOfRacks() {
    return numOfRacks;
  }

  /** @return the total number of leaf nodes */
  public int getNumOfLeaves() {
    return clusterMap.getNumOfLeaves();
  }

  /** Return the distance between two nodes
   * It is assumed that the distance from one node to its parent is 1
   * The distance between two nodes is calculated by summing up their distances
   * to their closest common ancestor.
   * @param node1 one node
   * @param node2 another node
   * @return the distance between node1 and node2 which is zero if they are the same
   *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
   */
  public int getDistance(Node node1, Node node2) {
    if ((node1 != null && node1.equals(node2)) ||
        (node1 == null && node2 == null))  {
      return 0;
    }
    if (node1 == null || node2 == null) {
      LOG.warn("One of the nodes is a null pointer");
      return Integer.MAX_VALUE;
    }
    Node n1=node1, n2=node2;
    int dis = 0;
    netlock.readLock().lock();
    try {
      int level1=node1.getLevel(), level2=node2.getLevel();
      while(n1!=null && level1>level2) {
        n1 = n1.getParent();
        level1--;
        dis++;
      }
      while(n2!=null && level2>level1) {
        n2 = n2.getParent();
        level2--;
        dis++;
      }
      while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
        n1=n1.getParent();
        n2=n2.getParent();
        dis+=2;
      }
    } finally {
      netlock.readLock().unlock();
    }
    if (n1==null) {
      LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node1));
      return Integer.MAX_VALUE;
    }
    if (n2==null) {
      LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node2));
      return Integer.MAX_VALUE;
    }
    return dis+2;
  }

  /** Return the distance between two nodes by comparing their network paths
   * without checking if they belong to the same ancestor node by reference.
   * It is assumed that the distance from one node to its parent is 1
   * The distance between two nodes is calculated by summing up their distances
   * to their closest common ancestor.
   * @param node1 one node
   * @param node2 another node
   * @return the distance between node1 and node2
   */
  static public int getDistanceByPath(Node node1, Node node2) {
    if (node1 == null && node2 == null) {
      return 0;
    }
    if (node1 == null || node2 == null) {
      LOG.warn("One of the nodes is a null pointer");
      return Integer.MAX_VALUE;
    }
    String[] paths1 = NodeBase.getPathComponents(node1);
    String[] paths2 = NodeBase.getPathComponents(node2);
    int dis = 0;
    int index = 0;
    int minLevel = Math.min(paths1.length, paths2.length);
    while (index < minLevel) {
      if (!paths1[index].equals(paths2[index])) {
        // Once the path starts to diverge,  compute the distance that include
        // the rest of paths.
        dis += 2 * (minLevel - index);
        break;
      }
      index++;
    }
    dis += Math.abs(paths1.length - paths2.length);
    return dis;
  }

  /** Check if two nodes are on the same rack
   * @param node1 one node (can be null)
   * @param node2 another node (can be null)
   * @return true if node1 and node2 are on the same rack; false otherwise
   * @exception IllegalArgumentException when either node1 or node2 is null, or
   * node1 or node2 do not belong to the cluster
   */
  public boolean isOnSameRack(Node node1, Node node2) {
    if (node1 == null || node2 == null) {
      return false;
    }

    return isSameParents(node1, node2);
  }
  
  /**
   * @return Check if network topology is aware of NodeGroup.
   */
  public boolean isNodeGroupAware() {
    return false;
  }
  
  /** 
   * @return Return false directly as not aware of NodeGroup, to be override in sub-class.
   * @param node1 input node1.
   * @param node2 input node2.
   */
  public boolean isOnSameNodeGroup(Node node1, Node node2) {
    return false;
  }

  /**
   * Compare the parents of each node for equality
   * 
   * <p>To be overridden in subclasses for specific NetworkTopology 
   * implementations, as alternative to overriding the full 
   * {@link #isOnSameRack(Node, Node)} method.
   * 
   * @param node1 the first node to compare
   * @param node2 the second node to compare
   * @return true if their parents are equal, false otherwise
   * 
   * @see #isOnSameRack(Node, Node)
   */
  protected boolean isSameParents(Node node1, Node node2) {
    return node1.getParent()==node2.getParent();
  }

  @VisibleForTesting
  void setRandomSeed(long seed) {
    RANDOM_REF.set(new Random(seed));
  }

  Random getRandom() {
    Random random = RANDOM_REF.get();
    return (random == null) ? ThreadLocalRandom.current() : random;
  }

  /**
   * Randomly choose a node.
   *
   * @param scope range of nodes from which a node will be chosen
   * @return the chosen node
   *
   * @see #chooseRandom(String, Collection)
   */
  public Node chooseRandom(final String scope) {
    return chooseRandom(scope, null);
  }

  /**
   * Randomly choose one node from <i>scope</i>.
   *
   * If scope starts with ~, choose one from the all nodes except for the
   * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
   * If excludedNodes is given, choose a node that's not in excludedNodes.
   *
   * @param scope range of nodes from which a node will be chosen
   * @param excludedNodes nodes to be excluded from
   * @return the chosen node
   */
  public Node chooseRandom(final String scope,
      final Collection<Node> excludedNodes) {
    netlock.readLock().lock();
    try {
      if (scope.startsWith("~")) {
        return chooseRandom(NodeBase.ROOT, scope.substring(1), excludedNodes);
      } else {
        return chooseRandom(scope, null, excludedNodes);
      }
    } finally {
      netlock.readLock().unlock();
    }
  }

  protected Node chooseRandom(final String scope, String excludedScope,
      final Collection<Node> excludedNodes) {
    if (excludedScope != null) {
      if (isChildScope(scope, excludedScope)) {
        return null;
      }
      if (!isChildScope(excludedScope, scope)) {
        excludedScope = null;
      }
    }
    Node node = getNode(scope);
    if (!(node instanceof InnerNode)) {
      return excludedNodes != null && excludedNodes.contains(node) ?
          null : node;
    }
    InnerNode innerNode = (InnerNode)node;
    int numOfDatanodes = innerNode.getNumOfLeaves();
    if (excludedScope == null) {
      node = null;
    } else {
      node = getNode(excludedScope);
      if (!(node instanceof InnerNode)) {
        numOfDatanodes -= 1;
      } else {
        numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
      }
    }
    if (numOfDatanodes <= 0) {
      LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\"). numOfDatanodes={}",
          scope, excludedScope, numOfDatanodes);
      return null;
    }
    final int availableNodes;
    if (excludedScope == null) {
      availableNodes = countNumOfAvailableNodes(scope, excludedNodes);
    } else {
      netlock.readLock().lock();
      try {
        availableNodes = countNumOfAvailableNodes(scope, excludedNodes) -
            countNumOfAvailableNodes(excludedScope, excludedNodes);
      } finally {
        netlock.readLock().unlock();
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Choosing random from {} available nodes on node {}, scope={},"
              + " excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
          availableNodes, innerNode, scope, excludedScope, excludedNodes,
          numOfDatanodes);
    }
    Node ret = null;
    if (availableNodes > 0) {
      ret = chooseRandom(innerNode, node, excludedNodes, numOfDatanodes,
          availableNodes);
    }
    LOG.debug("chooseRandom returning {}", ret);
    return ret;
  }

  /**
   * Randomly choose one node under <i>parentNode</i>, considering the exclude
   * nodes and scope. Should be called with {@link #netlock}'s readlock held.
   *
   * @param parentNode        the parent node
   * @param excludedScopeNode the node corresponding to the exclude scope.
   * @param excludedNodes     a collection of nodes to be excluded from
   * @param totalInScopeNodes total number of nodes under parentNode, excluding
   *                          the excludedScopeNode
   * @param availableNodes    number of available nodes under parentNode that
   *                          could be chosen, excluding excludedNodes
   * @return the chosen node, or null if none can be chosen
   */
  private Node chooseRandom(final InnerNode parentNode,
      final Node excludedScopeNode, final Collection<Node> excludedNodes,
      final int totalInScopeNodes, final int availableNodes) {
    if (totalInScopeNodes < availableNodes) {
      LOG.warn("Total Nodes in scope : {} are less than Available Nodes : {}",
          totalInScopeNodes, availableNodes);
      return null;
    }
    Random r = getRandom();
    if (excludedNodes == null || excludedNodes.isEmpty()) {
      // if there are no excludedNodes, randomly choose a node
      final int index = r.nextInt(totalInScopeNodes);
      return parentNode.getLeaf(index, excludedScopeNode);
    }

    // excludedNodes non empty.
    // Choose the nth VALID node, where n is random. VALID meaning it can be
    // returned, after considering exclude scope and exclude nodes.
    // The probability of being chosen should be equal for all VALID nodes.
    // Notably, we do NOT choose nth node, and find the next valid node
    // if n is excluded - this will make the probability of the node immediately
    // after an excluded node higher.
    //
    // Start point is always 0 and that's fine, because the nth valid node
    // logic provides equal randomness.
    //
    // Consider this example, where 1,3,5 out of the 10 nodes are excluded:
    // 1 2 3 4 5 6 7 8 9 10
    // x   x   x
    // We will randomly choose the nth valid node where n is [0,6].
    // We do NOT choose a random number n and just use the closest valid node,
    // for example both n=3 and n=4 will choose 4, making it a 2/10 probability,
    // higher than the expected 1/7
    // totalInScopeNodes=10 and availableNodes=7 in this example.
    int nthValidToReturn = r.nextInt(availableNodes);
    LOG.debug("nthValidToReturn is {}", nthValidToReturn);
    Node ret =
        parentNode.getLeaf(r.nextInt(totalInScopeNodes), excludedScopeNode);
    if (!excludedNodes.contains(ret)) {
      // return if we're lucky enough to get a valid node at a random first pick
      LOG.debug("Chosen node {} from first random", ret);
      return ret;
    } else {
      ret = null;
    }
    Node lastValidNode = null;
    for (int i = 0; i < totalInScopeNodes; ++i) {
      ret = parentNode.getLeaf(i, excludedScopeNode);
      if (!excludedNodes.contains(ret)) {
        if (nthValidToReturn == 0) {
          break;
        }
        --nthValidToReturn;
        lastValidNode = ret;
      } else {
        LOG.debug("Node {} is excluded, continuing.", ret);
        ret = null;
      }
    }
    if (ret == null && lastValidNode != null) {
      LOG.error("BUG: Found lastValidNode {} but not nth valid node. "
              + "parentNode={}, excludedScopeNode={}, excludedNodes={}, "
              + "totalInScopeNodes={}, availableNodes={}, nthValidToReturn={}.",
          lastValidNode, parentNode, excludedScopeNode, excludedNodes,
          totalInScopeNodes, availableNodes, nthValidToReturn);
      ret = lastValidNode;
    }
    return ret;
  }

  /** return leaves in <i>scope</i>
   * @param scope a path string
   * @return leaves nodes under specific scope
   */
  public List<Node> getLeaves(String scope) {
    Node node = getNode(scope);
    List<Node> leafNodes = new ArrayList<Node>();
    if (!(node instanceof InnerNode)) {
      leafNodes.add(node);
    } else {
      InnerNode innerNode = (InnerNode) node;
      for (int i=0;i<innerNode.getNumOfLeaves();i++) {
        leafNodes.add(innerNode.getLeaf(i, null));
      }
    }
    return leafNodes;
  }

  /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
   * if scope starts with ~, return the number of nodes that are not
   * in <i>scope</i> and <i>excludedNodes</i>; 
   * @param scope a path string that may start with ~
   * @param excludedNodes a list of nodes
   * @return number of available nodes
   */
  @VisibleForTesting
  public int countNumOfAvailableNodes(String scope,
                                      Collection<Node> excludedNodes) {
    boolean isExcluded=false;
    if (scope.startsWith("~")) {
      isExcluded=true;
      scope=scope.substring(1);
    }
    scope = NodeBase.normalize(scope);
    int excludedCountInScope = 0; // the number of nodes in both scope & excludedNodes
    int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes
    netlock.readLock().lock();
    try {
      if (excludedNodes != null) {
        for (Node node : excludedNodes) {
          node = getNode(NodeBase.getPath(node));
          if (node == null) {
            continue;
          }
          if (isNodeInScope(node, scope)) {
            if (node instanceof InnerNode) {
              excludedCountInScope += ((InnerNode) node).getNumOfLeaves();
            } else {
              excludedCountInScope++;
            }
          } else {
            excludedCountOffScope++;
          }
        }
      }
      Node n = getNode(scope);
      int scopeNodeCount = 0;
      if (n != null) {
        scopeNodeCount++;
      }
      if (n instanceof InnerNode) {
        scopeNodeCount=((InnerNode)n).getNumOfLeaves();
      }
      if (isExcluded) {
        return clusterMap.getNumOfLeaves() - scopeNodeCount
            - excludedCountOffScope;
      } else {
        return scopeNodeCount - excludedCountInScope;
      }
    } finally {
      netlock.readLock().unlock();
    }
  }

  /** convert a network tree to a string. */
  @Override
  public String toString() {
    // print the number of racks
    StringBuilder tree = new StringBuilder();
    tree.append("Number of racks: ")
        .append(numOfRacks)
        .append("\n");
    // print the number of leaves
    int numOfLeaves = getNumOfLeaves();
    tree.append("Expected number of leaves:")
        .append(numOfLeaves)
        .append("\n");
    // print nodes
    for(int i=0; i<numOfLeaves; i++) {
      tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)))
          .append("\n");
    }
    return tree.toString();
  }
  
  /**
   * @return Divide networklocation string into two parts by last separator, and get
   * the first part here.
   * 
   * @param networkLocation input networkLocation.
   */
  public static String getFirstHalf(String networkLocation) {
    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
    return networkLocation.substring(0, index);
  }

  /**
   * @return Divide networklocation string into two parts by last separator, and get
   * the second part here.
   * 
   * @param networkLocation input networkLocation.
   */
  public static String getLastHalf(String networkLocation) {
    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
    return networkLocation.substring(index);
  }

  /**
   * Returns an integer weight which specifies how far away {node} is away from
   * {reader}. A lower value signifies that a node is closer.
   * 
   * @param reader Node where data will be read
   * @param node Replica of data
   * @return weight
   */
  @VisibleForTesting
  protected int getWeight(Node reader, Node node) {
    // 0 is local, 2 is same rack, and each level on each node increases the
    //weight by 1
    //Start off by initializing to Integer.MAX_VALUE
    int weight = Integer.MAX_VALUE;
    if (reader != null && node != null) {
      if(reader.equals(node)) {
        return 0;
      }
      int maxReaderLevel = reader.getLevel();
      int maxNodeLevel = node.getLevel();
      int currentLevelToCompare = maxReaderLevel > maxNodeLevel ?
          maxNodeLevel : maxReaderLevel;
      Node r = reader;
      Node n = node;
      weight = 0;
      while(r != null && r.getLevel() > currentLevelToCompare) {
        r = r.getParent();
        weight++;
      }
      while(n != null && n.getLevel() > currentLevelToCompare) {
        n = n.getParent();
        weight++;
      }
      while(r != null && n != null && !r.equals(n)) {
        r = r.getParent();
        n = n.getParent();
        weight+=2;
      }
    }
    return weight;
  }

  /**
   * Returns an integer weight which specifies how far away <i>node</i> is
   * from <i>reader</i>. A lower value signifies that a node is closer.
   * It uses network location to calculate the weight
   *
   * @param reader Node where data will be read
   * @param node Replica of data
   * @return weight
   */
  @VisibleForTesting
  protected static int getWeightUsingNetworkLocation(Node reader, Node node) {
    //Start off by initializing to Integer.MAX_VALUE
    int weight = Integer.MAX_VALUE;
    if(reader != null && node != null) {
      String readerPath = normalizeNetworkLocationPath(
          reader.getNetworkLocation());
      String nodePath = normalizeNetworkLocationPath(
          node.getNetworkLocation());

      //same rack
      if(readerPath.equals(nodePath)) {
        if(reader.getName().equals(node.getName())) {
          weight = 0;
        } else {
          weight = 2;
        }
      } else {
        String[] readerPathToken = readerPath.split(PATH_SEPARATOR_STR);
        String[] nodePathToken = nodePath.split(PATH_SEPARATOR_STR);
        int maxLevelToCompare = readerPathToken.length > nodePathToken.length ?
            nodePathToken.length : readerPathToken.length;
        int currentLevel = 1;
        //traverse through the path and calculate the distance
        while(currentLevel < maxLevelToCompare) {
          if(!readerPathToken[currentLevel]
              .equals(nodePathToken[currentLevel])){
            break;
          }
          currentLevel++;
        }
        // +2 to correct the weight between reader and node rather than
        // between parent of reader and parent of node.
        weight = (readerPathToken.length - currentLevel) +
            (nodePathToken.length - currentLevel) + 2;
      }
    }
    return weight;
  }

  /** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}.
   * @param path path to normalize.
   * @return the normalised path
   * If <i>path</i>is null or empty {@link #ROOT} is returned
   * @throws IllegalArgumentException if the first character of a non empty path
   * is not {@link #PATH_SEPARATOR}
   */
  private static String normalizeNetworkLocationPath(String path) {
    if (path == null || path.length() == 0) {
      return ROOT;
    }

    if (path.charAt(0) != PATH_SEPARATOR) {
      throw new IllegalArgumentException("Network Location"
          + "path doesn't start with " +PATH_SEPARATOR+ ": "+path);
    }

    int len = path.length();
    if (path.charAt(len-1) == PATH_SEPARATOR) {
      return path.substring(0, len-1);
    }
    return path;
  }

  /**
   * Sort nodes array by network distance to <i>reader</i>.
   * <p>
   * In a three-level topology, a node can be either local, on the same rack,
   * or on a different rack from the reader. Sorting the nodes based on network
   * distance from the reader reduces network traffic and improves
   * performance.
   * <p>
   * As an additional twist, we also randomize the nodes at each network
   * distance. This helps with load balancing when there is data skew.
   *
   * @param reader    Node where data will be read
   * @param nodes     Available replicas with the requested data
   * @param activeLen Number of active nodes at the front of the array
   */
  public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
    /*
     * This method is called if the reader is a datanode,
     * so nonDataNodeReader flag is set to false.
     */
    sortByDistance(reader, nodes, activeLen, null);
  }

  /**
   * Sort nodes array by network distance to <i>reader</i> with secondary sort.
   * <p>
   * In a three-level topology, a node can be either local, on the same rack,
   * or on a different rack from the reader. Sorting the nodes based on network
   * distance from the reader reduces network traffic and improves
   * performance.
   * </p>
   * As an additional twist, we also randomize the nodes at each network
   * distance. This helps with load balancing when there is data skew.
   *
   * @param reader    Node where data will be read
   * @param nodes     Available replicas with the requested data
   * @param activeLen Number of active nodes at the front of the array
   * @param secondarySort a secondary sorting strategy which can inject into
   *     that point from outside to help sort the same distance.
   * @param <T> Generics Type T
   */
  public <T extends Node> void sortByDistance(Node reader, T[] nodes,
      int activeLen, Consumer<List<T>> secondarySort){
    sortByDistance(reader, nodes, activeLen, secondarySort, false);
  }

  /**
   * Sort nodes array by network distance to <i>reader</i> with secondary sort.
   * <p> using network location. This is used when the reader
   * is not a datanode. Sorting the nodes based on network distance
   * from the reader reduces network traffic and improves
   * performance.
   * </p>
   *
   * @param reader    Node where data will be read
   * @param nodes     Available replicas with the requested data
   * @param activeLen Number of active nodes at the front of the array
   */
  public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
      int activeLen) {
    /*
     * This method is called if the reader is not a datanode,
     * so nonDataNodeReader flag is set to true.
     */
    sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, null);
  }

  /**
   * Sort nodes array by network distance to <i>reader</i>.
   * <p> using network location. This is used when the reader
   * is not a datanode. Sorting the nodes based on network distance
   * from the reader reduces network traffic and improves
   * performance.
   * </p>
   *
   * @param reader    Node where data will be read
   * @param nodes     Available replicas with the requested data
   * @param activeLen Number of active nodes at the front of the array
   * @param secondarySort a secondary sorting strategy which can inject into
   *     that point from outside to help sort the same distance.
   * @param <T> Generics Type T.
   */
  public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
      T[] nodes, int activeLen, Consumer<List<T>> secondarySort) {
    sortByDistance(reader, nodes, activeLen, secondarySort, true);
  }

  /**
   * Sort nodes array by network distance to <i>reader</i>.
   * <p>
   * As an additional twist, we also randomize the nodes at each network
   * distance. This helps with load balancing when there is data skew.
   * And it helps choose node with more fast storage type.
   *
   * @param reader    Node where data will be read
   * @param nodes     Available replicas with the requested data
   * @param activeLen Number of active nodes at the front of the array
   * @param nonDataNodeReader True if the reader is not a datanode
   */
  private <T extends Node> void sortByDistance(Node reader, T[] nodes,
      int activeLen, Consumer<List<T>> secondarySort,
      boolean nonDataNodeReader) {
    /** Sort weights for the nodes array */
    TreeMap<Integer, List<T>> weightedNodeTree =
        new TreeMap<>();
    int nWeight;
    for (int i = 0; i < activeLen; i++) {
      if (nonDataNodeReader) {
        nWeight = getWeightUsingNetworkLocation(reader, nodes[i]);
      } else {
        nWeight = getWeight(reader, nodes[i]);
      }
      weightedNodeTree.computeIfAbsent(
          nWeight, k -> new ArrayList<>(1)).add(nodes[i]);
    }
    int idx = 0;
    // Sort nodes which have the same weight using secondarySort.
    for (List<T> nodesList : weightedNodeTree.values()) {
      Collections.shuffle(nodesList, getRandom());
      if (secondarySort != null) {
        // a secondary sort breaks the tie between nodes.
        secondarySort.accept(nodesList);
      }
      for (T n : nodesList) {
        nodes[idx++] = n;
      }
    }
    Preconditions.checkState(idx == activeLen,
        "Sorted the wrong number of nodes!");
  }

  /**
   * Checks whether one scope is contained in the other scope.
   * @param parentScope the parent scope to check
   * @param childScope  the child scope which needs to be checked.
   * @return true if childScope is contained within the parentScope
   */
  protected static boolean isChildScope(final String parentScope,
      final String childScope) {
    String pScope = parentScope.endsWith(NodeBase.PATH_SEPARATOR_STR) ?
        parentScope :  parentScope + NodeBase.PATH_SEPARATOR_STR;
    String cScope = childScope.endsWith(NodeBase.PATH_SEPARATOR_STR) ?
        childScope :  childScope + NodeBase.PATH_SEPARATOR_STR;
    return pScope.startsWith(cScope);
  }

  /**
   * Checks whether a node belongs to the scope.
   * @param node  the node to check.
   * @param scope scope to check.
   * @return true if node lies within the scope
   */
  protected static boolean isNodeInScope(Node node, String scope) {
    if (!scope.endsWith(NodeBase.PATH_SEPARATOR_STR)) {
      scope += NodeBase.PATH_SEPARATOR_STR;
    }
    String nodeLocation = NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR;
    return nodeLocation.startsWith(scope);
  }

  /** @return the number of nonempty racks */
  public int getNumOfNonEmptyRacks() {
    return numOfRacks - numOfEmptyRacks;
  }

  /**
   * Update empty rack number when add a node like recommission.
   * @param node node to be added; can be null
   */
  public void recommissionNode(Node node) {
    if (node == null) {
      return;
    }
    if (node instanceof InnerNode) {
      throw new IllegalArgumentException(
          "Not allow to remove an inner node: " + NodeBase.getPath(node));
    }
    netlock.writeLock().lock();
    try {
      decommissionNodes.remove(node.getName());
      interAddNodeWithEmptyRack(node);
    } finally {
      netlock.writeLock().unlock();
    }
  }

  /**
   * Update empty rack number when remove a node like decommission.
   * @param node node to be added; can be null
   */
  public void decommissionNode(Node node) {
    if (node == null) {
      return;
    }
    if (node instanceof InnerNode) {
      throw new IllegalArgumentException(
          "Not allow to remove an inner node: " + NodeBase.getPath(node));
    }
    netlock.writeLock().lock();
    try {
      decommissionNodes.add(node.getName());
      interRemoveNodeWithEmptyRack(node);
    } finally {
      netlock.writeLock().unlock();
    }
  }

  /**
   * Internal function for update empty rack number
   * for add or recommission a node.
   * @param node node to be added; can be null
   */
  private void interAddNodeWithEmptyRack(Node node) {
    if (node == null) {
      return;
    }
    String rackname = node.getNetworkLocation();
    Set<String> nodes = rackMap.get(rackname);
    if (nodes == null) {
      nodes = new HashSet<>();
    }
    if (!decommissionNodes.contains(node.getName())) {
      nodes.add(node.getName());
    }
    rackMap.put(rackname, nodes);
    countEmptyRacks();
  }

  /**
   * Internal function for update empty rack number
   * for remove or decommission a node.
   * @param node node to be removed; can be null
   */
  private void interRemoveNodeWithEmptyRack(Node node) {
    if (node == null) {
      return;
    }
    String rackname = node.getNetworkLocation();
    Set<String> nodes = rackMap.get(rackname);
    if (nodes != null) {
      InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
      if (rack == null) {
        // this node and its rack are both removed.
        rackMap.remove(rackname);
      } else if (nodes.contains(node.getName())) {
        // this node is decommissioned or removed.
        nodes.remove(node.getName());
        rackMap.put(rackname, nodes);
      }
      countEmptyRacks();
    }
  }

  private void countEmptyRacks() {
    int count = 0;
    for (Set<String> nodes : rackMap.values()) {
      if (nodes != null && nodes.isEmpty()) {
        count++;
      }
    }
    numOfEmptyRacks = count;
    LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractDNSToSwitchMapping 源码

hadoop CachedDNSToSwitchMapping 源码

hadoop ConnectTimeoutException 源码

hadoop DNS 源码

hadoop DNSDomainNameResolver 源码

hadoop DNSToSwitchMapping 源码

hadoop DNSToSwitchMappingWithDependency 源码

hadoop DomainNameResolver 源码

hadoop DomainNameResolverFactory 源码

hadoop InnerNode 源码

0  赞