hadoop ReplicaMap 源码

  • 2022-10-20
  • 浏览 (189)

haddop ReplicaMap 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.common.NoLockManager;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.util.LightWeightResizableGSet;

/**
 * Maintains the replica map. 
 */
class ReplicaMap {
  // Lock object to synchronize this instance.
  private DataNodeLockManager<AutoCloseDataSetLock> lockManager;

  // Map of block pool Id to another map of block Id to ReplicaInfo.
  private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
      new ConcurrentHashMap<>();

  ReplicaMap(DataNodeLockManager<AutoCloseDataSetLock> manager) {
    if (manager == null) {
      throw new HadoopIllegalArgumentException(
          "Object to synchronize on cannot be null");
    }
    this.lockManager = manager;
  }

  // Used for ut or temp replicaMap that no need to protected by lock.
  ReplicaMap() {
    this.lockManager = new NoLockManager();
  }
  
  String[] getBlockPoolList() {
    Set<String> bpset = map.keySet();
    return bpset.toArray(new String[bpset.size()]);
  }
  
  private void checkBlockPool(String bpid) {
    if (bpid == null) {
      throw new IllegalArgumentException("Block Pool Id is null");
    }
  }
  
  private void checkBlock(Block b) {
    if (b == null) {
      throw new IllegalArgumentException("Block is null");
    }
  }
  
  /**
   * Get the meta information of the replica that matches both block id 
   * and generation stamp
   * @param bpid block pool id
   * @param block block with its id as the key
   * @return the replica's meta information
   * @throws IllegalArgumentException if the input block or block pool is null
   */
  ReplicaInfo get(String bpid, Block block) {
    checkBlockPool(bpid);
    checkBlock(block);
    ReplicaInfo replicaInfo = get(bpid, block.getBlockId());
    if (replicaInfo != null && 
        block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
      return replicaInfo;
    }
    return null;
  }
  
  
  /**
   * Get the meta information of the replica that matches the block id
   * @param bpid block pool id
   * @param blockId a block's id
   * @return the replica's meta information
   */
  ReplicaInfo get(String bpid, long blockId) {
    checkBlockPool(bpid);
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      return m != null ? m.get(new Block(blockId)) : null;
    }
  }

  /**
   * Add a replica's meta information into the map 
   * 
   * @param bpid block pool id
   * @param replicaInfo a replica's meta information
   * @return previous meta information of the replica
   * @throws IllegalArgumentException if the input parameter is null
   */
  ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
    checkBlockPool(bpid);
    checkBlock(replicaInfo);
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
        m = map.get(bpid);
      }
      return  m.put(replicaInfo);
    }
  }

  /**
   * Add a replica's meta information into the map, if already exist
   * return the old replicaInfo.
   */
  ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
    checkBlockPool(bpid);
    checkBlock(replicaInfo);
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
        m = map.get(bpid);
      }
      ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
      if (oldReplicaInfo != null) {
        return oldReplicaInfo;
      } else {
        m.put(replicaInfo);
      }
      return replicaInfo;
    }
  }

  /**
   * Add all entries from the given replica map into the local replica map.
   */
  void addAll(ReplicaMap other) {
    map.putAll(other.map);
  }


  /**
   * Merge all entries from the given replica map into the local replica map.
   */
  void mergeAll(ReplicaMap other) {
    Set<String> bplist = other.map.keySet();
    for (String bp : bplist) {
      checkBlockPool(bp);
      try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bp)) {
        LightWeightResizableGSet<Block, ReplicaInfo> replicaInfos = other.map.get(bp);
        LightWeightResizableGSet<Block, ReplicaInfo> curSet = map.get(bp);
        HashSet<ReplicaInfo> replicaSet = new HashSet<>();
        //Can't add to GSet while in another GSet iterator may cause endlessLoop
        for (ReplicaInfo replicaInfo : replicaInfos) {
          replicaSet.add(replicaInfo);
        }
        for (ReplicaInfo replicaInfo : replicaSet) {
          checkBlock(replicaInfo);
          if (curSet == null) {
            // Add an entry for block pool if it does not exist already
            curSet = new LightWeightResizableGSet<>();
            map.put(bp, curSet);
          }
          curSet.put(replicaInfo);
        }
      }
    }
  }
  
  /**
   * Remove the replica's meta information from the map that matches
   * the input block's id and generation stamp
   * @param bpid block pool id
   * @param block block with its id as the key
   * @return the removed replica's meta information
   * @throws IllegalArgumentException if the input block is null
   */
  ReplicaInfo remove(String bpid, Block block) {
    checkBlockPool(bpid);
    checkBlock(block);
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      if (m != null) {
        ReplicaInfo replicaInfo = m.get(block);
        if (replicaInfo != null &&
            block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
          return m.remove(block);
        }
      }
    }
    
    return null;
  }
  
  /**
   * Remove the replica's meta information from the map if present
   * @param bpid block pool id
   * @param blockId block id of the replica to be removed
   * @return the removed replica's meta information
   */
  ReplicaInfo remove(String bpid, long blockId) {
    checkBlockPool(bpid);
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      if (m != null) {
        return m.remove(new Block(blockId));
      }
    }
    return null;
  }
 
  /**
   * Get the size of the map for given block pool
   * @param bpid block pool id
   * @return the number of replicas in the map
   */
  int size(String bpid) {
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      return m != null ? m.size() : 0;
    }
  }
  
  /**
   * Get a collection of the replicas for given block pool
   * This method is <b>not synchronized</b>. If you want to keep thread safe
   * Use method {@link #replicas(String, Consumer<Iterator<ReplicaInfo>>)}.
   *
   * @param bpid block pool id
   * @return a collection of the replicas belonging to the block pool
   */
  Collection<ReplicaInfo> replicas(String bpid) {
    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
    m = map.get(bpid);
    return m != null ? m.values() : null;
  }

  /**
   * execute function for one block pool and protect by LockManager.
   * This method is <b>synchronized</b>.
   *
   * @param bpid block pool id
   */
  void replicas(String bpid, Consumer<Iterator<ReplicaInfo>> consumer) {
    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
      m = map.get(bpid);
      if (m !=null) {
        m.getIterator(consumer);
      }
    }
  }

  void initBlockPool(String bpid) {
    checkBlockPool(bpid);
    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
        map.put(bpid, m);
      }
    }
  }
  
  void cleanUpBlockPool(String bpid) {
    checkBlockPool(bpid);
    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
      map.remove(bpid);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AddBlockPoolException 源码

hadoop BlockPoolSlice 源码

hadoop CacheStats 源码

hadoop FsDatasetAsyncDiskService 源码

hadoop FsDatasetCache 源码

hadoop FsDatasetFactory 源码

hadoop FsDatasetImpl 源码

hadoop FsDatasetUtil 源码

hadoop FsVolumeImpl 源码

hadoop FsVolumeImplBuilder 源码

0  赞