hadoop AbstractClusterStory 源码

  • 2022-10-20
  • 浏览 (229)

haddop AbstractClusterStory 代码

文件路径:/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/AbstractClusterStory.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.rumen;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * {@link AbstractClusterStory} provides a partial implementation of
 * {@link ClusterStory} by parsing the topology tree.
 */
public abstract class AbstractClusterStory implements ClusterStory {
  protected Set<MachineNode> machineNodes;
  protected Set<RackNode> rackNodes;
  protected MachineNode[] mNodesFlattened;
  protected Map<String, MachineNode> mNodeMap;
  protected Map<String, RackNode> rNodeMap;
  protected int maximumDistance = 0;
  
  @Override
  public Set<MachineNode> getMachines() {
    parseTopologyTree();
    return machineNodes;
  }
  
  @Override
  public synchronized Set<RackNode> getRacks() {
    parseTopologyTree();    
    return rackNodes;
  }
  
  @Override
  public synchronized MachineNode[] getRandomMachines(int expected, 
                                                      Random random) {
    if (expected == 0) {
      return new MachineNode[0];
    }

    parseTopologyTree();
    int total = machineNodes.size();
    int select = Math.min(expected, total);

    if (mNodesFlattened == null) {
      mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
    }

    MachineNode[] retval = new MachineNode[select];
    int i = 0;
    while ((i != select) && (total != i + select)) {
      int index = random.nextInt(total - i);
      MachineNode tmp = mNodesFlattened[index];
      mNodesFlattened[index] = mNodesFlattened[total - i - 1];
      mNodesFlattened[total - i - 1] = tmp;
      ++i;
    }
    if (i == select) {
      System.arraycopy(mNodesFlattened, total - i, retval, 0, select);
    } else {
      System.arraycopy(mNodesFlattened, 0, retval, 0, select);
    }

    return retval;
  }
  
  protected synchronized void buildMachineNodeMap() {
    if (mNodeMap == null) {
      mNodeMap = new HashMap<String, MachineNode>(machineNodes.size());
      for (MachineNode mn : machineNodes) {
        mNodeMap.put(mn.getName(), mn);
      }
    }
  }
  
  @Override
  public MachineNode getMachineByName(String name) {
    buildMachineNodeMap();
    return mNodeMap.get(name);
  }
  
  @Override
  public int distance(Node a, Node b) {
    int lvl_a = a.getLevel();
    int lvl_b = b.getLevel();
    int retval = 0;
    if (lvl_a > lvl_b) {
      retval = lvl_a-lvl_b;
      for (int i=0; i<retval; ++i) {
        a = a.getParent();
      }
    } else if (lvl_a < lvl_b) {
      retval = lvl_b-lvl_a;
      for (int i=0; i<retval; ++i) {
        b = b.getParent();
      }      
    }
    
    while (a != b) {
      a = a.getParent();
      b = b.getParent();
      ++retval;
    }
    
    return retval;
  }
  
  protected synchronized void buildRackNodeMap() {
    if (rNodeMap == null) {
      rNodeMap = new HashMap<String, RackNode>(rackNodes.size());
      for (RackNode rn : rackNodes) {
        rNodeMap.put(rn.getName(), rn);
      }
    }
  }
  
  @Override
  public RackNode getRackByName(String name) {
    buildRackNodeMap();
    return rNodeMap.get(name);
  }
  
  @Override
  public int getMaximumDistance() {
    parseTopologyTree();
    return maximumDistance;
  }
  
  protected synchronized void parseTopologyTree() {
    if (machineNodes == null) {
      Node root = getClusterTopology();
      SortedSet<MachineNode> mNodes = new TreeSet<MachineNode>();
      SortedSet<RackNode> rNodes = new TreeSet<RackNode>();
      // dfs search of the tree.
      Deque<Node> unvisited = new ArrayDeque<Node>();
      Deque<Integer> distUnvisited = new ArrayDeque<Integer>();
      unvisited.add(root);
      distUnvisited.add(0);
      for (Node n = unvisited.poll(); n != null; n = unvisited.poll()) {
        int distance = distUnvisited.poll();
        if (n instanceof RackNode) {
          rNodes.add((RackNode) n);
          mNodes.addAll(((RackNode) n).getMachinesInRack());
          if (distance + 1 > maximumDistance) {
            maximumDistance = distance + 1;
          }
        } else if (n instanceof MachineNode) {
          mNodes.add((MachineNode) n);
          if (distance > maximumDistance) {
            maximumDistance = distance;
          }
        } else {
          for (Node child : n.getChildren()) {
            unvisited.addFirst(child);
            distUnvisited.addFirst(distance+1);
          }
        }
      }

      machineNodes = Collections.unmodifiableSortedSet(mNodes);
      rackNodes = Collections.unmodifiableSortedSet(rNodes);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Anonymizer 源码

hadoop CDFPiecewiseLinearRandomGenerator 源码

hadoop CDFRandomGenerator 源码

hadoop ClusterStory 源码

hadoop ClusterTopologyReader 源码

hadoop CurrentJHParser 源码

hadoop DeepCompare 源码

hadoop DeepInequalityException 源码

hadoop DefaultInputDemuxer 源码

hadoop DefaultOutputter 源码

0  赞