hadoop StripedReconstructor 源码

  • 2022-10-20
  • 浏览 (208)

haddop StripedReconstructor 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.CompletionService;
import java.util.concurrent.atomic.AtomicLong;

/**
 * StripedReconstructor reconstruct one or more missed striped block in the
 * striped block group, the minimum number of live striped blocks should be
 * no less than data block number.
 *
 * | <- Striped Block Group -> |
 *  blk_0      blk_1       blk_2(*)   blk_3   ...   <- A striped block group
 *    |          |           |          |
 *    v          v           v          v
 * +------+   +------+   +------+   +------+
 * |cell_0|   |cell_1|   |cell_2|   |cell_3|  ...
 * +------+   +------+   +------+   +------+
 * |cell_4|   |cell_5|   |cell_6|   |cell_7|  ...
 * +------+   +------+   +------+   +------+
 * |cell_8|   |cell_9|   |cell10|   |cell11|  ...
 * +------+   +------+   +------+   +------+
 *  ...         ...       ...         ...
 *
 *
 * We use following steps to reconstruct striped block group, in each round, we
 * reconstruct <code>bufferSize</code> data until finish, the
 * <code>bufferSize</code> is configurable and may be less or larger than
 * cell size:
 * step1: read <code>bufferSize</code> data from minimum number of sources
 *        required by reconstruction.
 * step2: decode data for targets.
 * step3: transfer data to targets.
 *
 * In step1, try to read <code>bufferSize</code> data from minimum number
 * of sources , if there is corrupt or stale sources, read from new source
 * will be scheduled. The best sources are remembered for next round and
 * may be updated in each round.
 *
 * In step2, typically if source blocks we read are all data blocks, we
 * need to call encode, and if there is one parity block, we need to call
 * decode. Notice we only read once and reconstruct all missed striped block
 * if they are more than one.
 *
 * In step3, send the reconstructed data to targets by constructing packet
 * and send them directly. Same as continuous block replication, we
 * don't check the packet ack. Since the datanode doing the reconstruction work
 * are one of the source datanodes, so the reconstructed data are sent
 * remotely.
 *
 * There are some points we can do further improvements in next phase:
 * 1. we can read the block file directly on the local datanode,
 *    currently we use remote block reader. (Notice short-circuit is not
 *    a good choice, see inline comments).
 * 2. We need to check the packet ack for EC reconstruction? Since EC
 *    reconstruction is more expensive than continuous block replication,
 *    it needs to read from several other datanodes, should we make sure the
 *    reconstructed result received by targets?
 */
@InterfaceAudience.Private
abstract class StripedReconstructor {
  protected static final Logger LOG = DataNode.LOG;

  private final Configuration conf;
  private final DataNode datanode;
  private final ErasureCodingPolicy ecPolicy;
  private final ErasureCoderOptions coderOptions;
  private RawErasureDecoder decoder;
  private final ExtendedBlock blockGroup;
  private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();

  private final boolean isValidationEnabled;
  private DecodingValidator validator;

  // position in striped internal block
  private long positionInBlock;
  private StripedReader stripedReader;
  private ErasureCodingWorker erasureCodingWorker;
  private final CachingStrategy cachingStrategy;
  private long maxTargetLength = 0L;
  private final BitSet liveBitSet;
  private final BitSet excludeBitSet;

  // metrics
  private AtomicLong bytesRead = new AtomicLong(0);
  private AtomicLong bytesWritten = new AtomicLong(0);
  private AtomicLong remoteBytesRead = new AtomicLong(0);

  StripedReconstructor(ErasureCodingWorker worker,
      StripedReconstructionInfo stripedReconInfo) {
    this.erasureCodingWorker = worker;
    this.datanode = worker.getDatanode();
    this.conf = worker.getConf();
    this.ecPolicy = stripedReconInfo.getEcPolicy();
    liveBitSet = new BitSet(
        ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
    for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) {
      liveBitSet.set(stripedReconInfo.getLiveIndices()[i]);
    }
    excludeBitSet = new BitSet(
            ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
    for (int i = 0; i < stripedReconInfo.getExcludeReconstructedIndices().length; i++) {
      excludeBitSet.set(stripedReconInfo.getExcludeReconstructedIndices()[i]);
    }

    blockGroup = stripedReconInfo.getBlockGroup();
    stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
    cachingStrategy = CachingStrategy.newDefaultStrategy();

    positionInBlock = 0L;

    coderOptions = new ErasureCoderOptions(
        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
    isValidationEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
        && !coderOptions.allowChangeInputs();
  }

  public void incrBytesRead(boolean local, long delta) {
    if (local) {
      bytesRead.addAndGet(delta);
    } else {
      bytesRead.addAndGet(delta);
      remoteBytesRead.addAndGet(delta);
    }
  }

  public void incrBytesWritten(long delta) {
    bytesWritten.addAndGet(delta);
  }

  public long getBytesRead() {
    return bytesRead.get();
  }

  public long getRemoteBytesRead() {
    return remoteBytesRead.get();
  }

  public long getBytesWritten() {
    return bytesWritten.get();
  }

  /**
   * Reconstruct one or more missed striped block in the striped block group,
   * the minimum number of live striped blocks should be no less than data
   * block number.
   *
   * @throws IOException
   */
  abstract void reconstruct() throws IOException;

  boolean useDirectBuffer() {
    return decoder.preferDirectBuffer();
  }

  ByteBuffer allocateBuffer(int length) {
    return BUFFER_POOL.getBuffer(useDirectBuffer(), length);
  }

  void freeBuffer(ByteBuffer buffer) {
    BUFFER_POOL.putBuffer(buffer);
  }

  ExtendedBlock getBlock(int i) {
    return StripedBlockUtil.constructInternalBlock(blockGroup, ecPolicy, i);
  }

  long getBlockLen(int i) {
    return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
        ecPolicy, i);
  }

  // Initialize decoder
  protected void initDecoderIfNecessary() {
    if (decoder == null) {
      decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
          coderOptions);
    }
  }

  // Initialize decoding validator
  protected void initDecodingValidatorIfNecessary() {
    if (isValidationEnabled && validator == null) {
      validator = new DecodingValidator(decoder);
    }
  }

  long getPositionInBlock() {
    return positionInBlock;
  }

  InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
    return NetUtils.createSocketAddr(dnInfo.getXferAddr(
        datanode.getDnConf().getConnectToDnViaHostname()));
  }

  int getBufferSize() {
    return stripedReader.getBufferSize();
  }

  public DataChecksum getChecksum() {
    return stripedReader.getChecksum();
  }

  CachingStrategy getCachingStrategy() {
    return cachingStrategy;
  }

  CompletionService<BlockReadStats> createReadService() {
    return erasureCodingWorker.createReadService();
  }

  ExtendedBlock getBlockGroup() {
    return blockGroup;
  }

  /**
   * Get the xmits that _will_ be used for this reconstruction task.
   */
  int getXmits() {
    return stripedReader.getXmits();
  }

  BitSet getLiveBitSet() {
    return liveBitSet;
  }

  BitSet getExcludeBitSet(){
    return excludeBitSet;
  }

  long getMaxTargetLength() {
    return maxTargetLength;
  }

  void setMaxTargetLength(long maxTargetLength) {
    this.maxTargetLength = maxTargetLength;
  }

  void updatePositionInBlock(long positionInBlockArg) {
    this.positionInBlock += positionInBlockArg;
  }

  RawErasureDecoder getDecoder() {
    return decoder;
  }

  int getNumLiveBlocks(){
    return liveBitSet.cardinality();
  }

  void cleanup() {
    if (decoder != null) {
      decoder.release();
    }
  }

  StripedReader getStripedReader() {
    return stripedReader;
  }

  Configuration getConf() {
    return conf;
  }

  DataNode getDatanode() {
    return datanode;
  }

  public ErasureCodingWorker getErasureCodingWorker() {
    return erasureCodingWorker;
  }

  @VisibleForTesting
  static ByteBufferPool getBufferPool() {
    return BUFFER_POOL;
  }

  boolean isValidationEnabled() {
    return isValidationEnabled;
  }

  DecodingValidator getValidator() {
    return validator;
  }

  protected static void markBuffers(ByteBuffer[] buffers) {
    for (ByteBuffer buffer: buffers) {
      if (buffer != null) {
        buffer.mark();
      }
    }
  }

  protected static void resetBuffers(ByteBuffer[] buffers) {
    for (ByteBuffer buffer: buffers) {
      if (buffer != null) {
        buffer.reset();
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ErasureCodingWorker 源码

hadoop StripedBlockChecksumCompositeCrcReconstructor 源码

hadoop StripedBlockChecksumMd5CrcReconstructor 源码

hadoop StripedBlockChecksumReconstructor 源码

hadoop StripedBlockReader 源码

hadoop StripedBlockReconstructor 源码

hadoop StripedBlockWriter 源码

hadoop StripedReader 源码

hadoop StripedReconstructionInfo 源码

hadoop StripedWriter 源码

0  赞