hadoop StripedReader 源码

  • 2022-10-20
  • 浏览 (218)

haddop StripedReader 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;

/**
 * Manage striped readers that performs reading of block data from remote to
 * serve input data for the erasure decoding.
 */
@InterfaceAudience.Private
class StripedReader {
  private static final Logger LOG = DataNode.LOG;

  private final int stripedReadTimeoutInMills;
  private final int stripedReadBufferSize;

  private StripedReconstructor reconstructor;
  private final DataNode datanode;
  private final Configuration conf;

  private final int dataBlkNum;
  private final int parityBlkNum;


  private DataChecksum checksum;
  // Striped read buffer size
  private int bufferSize;
  private int[] successList;

  private final int minRequiredSources;
  // the number of xmits used by the re-construction task.
  private final int xmits;
  // The buffers and indices for striped blocks whose length is 0
  private ByteBuffer[] zeroStripeBuffers;
  private short[] zeroStripeIndices;

  // sources
  private final byte[] liveIndices;
  private final DatanodeInfo[] sources;

  private final List<StripedBlockReader> readers;

  private final Map<Future<BlockReadStats>, Integer> futures = new HashMap<>();
  private final CompletionService<BlockReadStats> readService;

  StripedReader(StripedReconstructor reconstructor, DataNode datanode,
      Configuration conf, StripedReconstructionInfo stripedReconInfo) {
    stripedReadTimeoutInMills = conf.getInt(
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
    stripedReadBufferSize = conf.getInt(
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT);

    this.reconstructor = reconstructor;
    this.datanode = datanode;
    this.conf = conf;

    dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
    parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();

    int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)
        / stripedReconInfo.getEcPolicy().getCellSize() + 1);
    minRequiredSources = Math.min(cellsNum, dataBlkNum);

    if (minRequiredSources < dataBlkNum) {
      int zeroStripNum = dataBlkNum - minRequiredSources;
      zeroStripeBuffers = new ByteBuffer[zeroStripNum];
      zeroStripeIndices = new short[zeroStripNum];
    }

    // It is calculated by the maximum number of connections from either sources
    // or targets.
    xmits = Math.max(minRequiredSources,
        stripedReconInfo.getTargets() != null ?
        stripedReconInfo.getTargets().length : 0);

    this.liveIndices = stripedReconInfo.getLiveIndices();
    assert liveIndices != null;
    this.sources = stripedReconInfo.getSources();
    assert sources != null;

    readers = new ArrayList<>(sources.length);
    readService = reconstructor.createReadService();

    Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
        "No enough live striped blocks.");
    Preconditions.checkArgument(liveIndices.length == sources.length,
        "liveBlockIndices and source datanodes should match");
  }

  void init() throws IOException {
    initReaders();

    initBufferSize();

    initZeroStrip();
  }

  private void initReaders() throws IOException {
    // Store the array indices of source DNs we have read successfully.
    // In each iteration of read, the successList list may be updated if
    // some source DN is corrupted or slow. And use the updated successList
    // list of DNs for next iteration read.
    successList = new int[minRequiredSources];

    StripedBlockReader reader;
    int nSuccess = 0;
    for (int i = 0; i < sources.length && nSuccess < minRequiredSources; i++) {
      reader = createReader(i, 0);
      readers.add(reader);
      if (reader.getBlockReader() != null) {
        initOrVerifyChecksum(reader);
        successList[nSuccess++] = i;
      }
    }

    if (nSuccess < minRequiredSources) {
      String error = "Can't find minimum sources required by "
          + "reconstruction, block id: "
          + reconstructor.getBlockGroup().getBlockId();
      throw new IOException(error);
    }
  }

  StripedBlockReader createReader(int idxInSources, long offsetInBlock) {
    return new StripedBlockReader(this, datanode,
        conf, liveIndices[idxInSources],
        reconstructor.getBlock(liveIndices[idxInSources]),
        sources[idxInSources], offsetInBlock);
  }

  private void initBufferSize() {
    int bytesPerChecksum = checksum.getBytesPerChecksum();
    // The bufferSize is flat to divide bytesPerChecksum
    int readBufferSize = stripedReadBufferSize;
    bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
        readBufferSize - readBufferSize % bytesPerChecksum;
  }

  // init checksum from block reader
  private void initOrVerifyChecksum(StripedBlockReader reader) {
    if (checksum == null) {
      checksum = reader.getBlockReader().getDataChecksum();
    } else {
      assert reader.getBlockReader().getDataChecksum().equals(checksum);
    }
  }

  protected ByteBuffer allocateReadBuffer() {
    return reconstructor.allocateBuffer(getBufferSize());
  }

  private void initZeroStrip() {
    if (zeroStripeBuffers != null) {
      for (int i = 0; i < zeroStripeBuffers.length; i++) {
        zeroStripeBuffers[i] = reconstructor.allocateBuffer(bufferSize);
      }
    }

    BitSet bitset = reconstructor.getLiveBitSet();
    int k = 0;
    for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
      if (!bitset.get(i)) {
        if (reconstructor.getBlockLen(i) <= 0) {
          zeroStripeIndices[k++] = (short)i;
        }
      }
    }
  }

  private int getReadLength(int index, int reconstructLength) {
    // the reading length should not exceed the length for reconstruction
    long blockLen = reconstructor.getBlockLen(index);
    long remaining = blockLen - reconstructor.getPositionInBlock();
    return (int) Math.min(remaining, reconstructLength);
  }

  ByteBuffer[] getInputBuffers(int toReconstructLen) {
    ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];

    for (int i = 0; i < successList.length; i++) {
      int index = successList[i];
      StripedBlockReader reader = getReader(index);
      ByteBuffer buffer = reader.getReadBuffer();
      paddingBufferToLen(buffer, toReconstructLen);
      inputs[reader.getIndex()] = (ByteBuffer)buffer.flip();
    }

    if (successList.length < dataBlkNum) {
      for (int i = 0; i < zeroStripeBuffers.length; i++) {
        ByteBuffer buffer = zeroStripeBuffers[i];
        paddingBufferToLen(buffer, toReconstructLen);
        int index = zeroStripeIndices[i];
        inputs[index] = (ByteBuffer)buffer.flip();
      }
    }

    return inputs;
  }

  private void paddingBufferToLen(ByteBuffer buffer, int len) {
    if (len > buffer.limit()) {
      buffer.limit(len);
    }
    int toPadding = len - buffer.position();
    for (int i = 0; i < toPadding; i++) {
      buffer.put((byte) 0);
    }
  }

  /**
   * Read from minimum source DNs required for reconstruction in the iteration.
   * First try the success list which we think they are the best DNs
   * If source DN is corrupt or slow, try to read some other source DN,
   * and will update the success list.
   *
   * Remember the updated success list and return it for following
   * operations and next iteration read.
   *
   * @param reconstructLength the length to reconstruct.
   * @return updated success list of source DNs we do real read
   * @throws IOException
   */
  void readMinimumSources(int reconstructLength) throws IOException {
    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
    try {
      successList = doReadMinimumSources(reconstructLength, corruptedBlocks);
    } finally {
      // report corrupted blocks to NN
      datanode.reportCorruptedBlocks(corruptedBlocks);
    }
  }

  int[] doReadMinimumSources(int reconstructLength,
                             CorruptedBlocks corruptedBlocks)
      throws IOException {
    Preconditions.checkArgument(reconstructLength >= 0 &&
        reconstructLength <= bufferSize);
    int nSuccess = 0;
    int[] newSuccess = new int[minRequiredSources];
    BitSet usedFlag = new BitSet(sources.length);
    /*
     * Read from minimum source DNs required, the success list contains
     * source DNs which we think best.
     */
    for (int i = 0; i < minRequiredSources; i++) {
      StripedBlockReader reader = readers.get(successList[i]);
      int toRead = getReadLength(liveIndices[successList[i]],
          reconstructLength);
      if (toRead > 0) {
        Callable<BlockReadStats> readCallable =
            reader.readFromBlock(toRead, corruptedBlocks);
        Future<BlockReadStats> f = readService.submit(readCallable);
        futures.put(f, successList[i]);
      } else {
        // If the read length is 0, we don't need to do real read
        reader.getReadBuffer().position(0);
        newSuccess[nSuccess++] = successList[i];
      }
      usedFlag.set(successList[i]);
    }

    while (!futures.isEmpty()) {
      try {
        StripingChunkReadResult result =
            StripedBlockUtil.getNextCompletedStripedRead(
                readService, futures, stripedReadTimeoutInMills);
        int resultIndex = -1;
        if (result.state == StripingChunkReadResult.SUCCESSFUL) {
          resultIndex = result.index;
        } else if (result.state == StripingChunkReadResult.FAILED) {
          // If read failed for some source DN, we should not use it anymore
          // and schedule read from another source DN.
          StripedBlockReader failedReader = readers.get(result.index);
          failedReader.closeBlockReader();
          resultIndex = scheduleNewRead(usedFlag,
              reconstructLength, corruptedBlocks);
        } else if (result.state == StripingChunkReadResult.TIMEOUT) {
          // If timeout, we also schedule a new read.
          resultIndex = scheduleNewRead(usedFlag,
              reconstructLength, corruptedBlocks);
        }
        if (resultIndex >= 0) {
          newSuccess[nSuccess++] = resultIndex;
          if (nSuccess >= minRequiredSources) {
            // cancel remaining reads if we read successfully from minimum
            // number of source DNs required by reconstruction.
            cancelReads(futures.keySet());
            clearFuturesAndService();
            break;
          }
        }
      } catch (InterruptedException e) {
        LOG.info("Read data interrupted.", e);
        cancelReads(futures.keySet());
        clearFuturesAndService();
        break;
      }
    }

    if (nSuccess < minRequiredSources) {
      String error = "Can't read data from minimum number of sources "
          + "required by reconstruction, block id: " +
          reconstructor.getBlockGroup().getBlockId();
      throw new IOException(error);
    }

    return newSuccess;
  }

  /**
   * Schedule a read from some new source DN if some DN is corrupted
   * or slow, this is called from the read iteration.
   * Initially we may only have <code>minRequiredSources</code> number of
   * StripedBlockReader.
   * If the position is at the end of target block, don't need to do
   * real read, and return the array index of source DN, otherwise -1.
   *
   * @param used the used source DNs in this iteration.
   * @return the array index of source DN if don't need to do real read.
   */
  private int scheduleNewRead(BitSet used, int reconstructLength,
                              CorruptedBlocks corruptedBlocks) {
    StripedBlockReader reader = null;
    // step1: initially we may only have <code>minRequiredSources</code>
    // number of StripedBlockReader, and there may be some source DNs we never
    // read before, so will try to create StripedBlockReader for one new source
    // DN and try to read from it. If found, go to step 3.
    int m = readers.size();
    int toRead = 0;
    while (reader == null && m < sources.length) {
      reader = createReader(m, reconstructor.getPositionInBlock());
      readers.add(reader);
      toRead = getReadLength(liveIndices[m], reconstructLength);
      if (toRead > 0) {
        if (reader.getBlockReader() == null) {
          reader = null;
          m++;
        }
      } else {
        used.set(m);
        return m;
      }
    }

    // step2: if there is no new source DN we can use, try to find a source
    // DN we ever read from but because some reason, e.g., slow, it
    // is not in the success DN list at the begin of this iteration, so
    // we have not tried it in this iteration. Now we have a chance to
    // revisit it again.
    for (int i = 0; reader == null && i < readers.size(); i++) {
      if (!used.get(i)) {
        StripedBlockReader stripedReader = readers.get(i);
        toRead = getReadLength(liveIndices[i], reconstructLength);
        if (toRead > 0) {
          stripedReader.closeBlockReader();
          stripedReader.resetBlockReader(reconstructor.getPositionInBlock());
          if (stripedReader.getBlockReader() != null) {
            stripedReader.getReadBuffer().position(0);
            m = i;
            reader = stripedReader;
          }
        } else {
          used.set(i);
          stripedReader.getReadBuffer().position(0);
          return i;
        }
      }
    }

    // step3: schedule if find a correct source DN and need to do real read.
    if (reader != null) {
      Callable<BlockReadStats> readCallable =
          reader.readFromBlock(toRead, corruptedBlocks);
      Future<BlockReadStats> f = readService.submit(readCallable);
      futures.put(f, m);
      used.set(m);
    }

    return -1;
  }

  // Cancel all reads.
  private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
    for (Future<BlockReadStats> future : futures) {
      future.cancel(true);
    }
  }

  // remove all stale futures from readService, and clear futures.
  private void clearFuturesAndService() {
    while (!futures.isEmpty()) {
      try {
        Future<BlockReadStats> future = readService.poll(
            stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
        );
        futures.remove(future);
      } catch (InterruptedException e) {
        LOG.info("Clear stale futures from service is interrupted.", e);
      }
    }
  }

  void close() {
    if (zeroStripeBuffers != null) {
      for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
        reconstructor.freeBuffer(zeroStripeBuffer);
      }
    }
    zeroStripeBuffers = null;

    for (StripedBlockReader reader : readers) {
      reader.closeBlockReader();
      reconstructor.freeBuffer(reader.getReadBuffer());
      reader.freeReadBuffer();
    }
  }

  StripedReconstructor getReconstructor() {
    return reconstructor;
  }

  StripedBlockReader getReader(int i) {
    return readers.get(i);
  }

  int getBufferSize() {
    return bufferSize;
  }

  DataChecksum getChecksum() {
    return checksum;
  }

  void clearBuffers() {
    if (zeroStripeBuffers != null) {
      for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
        zeroStripeBuffer.clear();
      }
    }

    for (StripedBlockReader reader : readers) {
      if (reader.getReadBuffer() != null) {
        reader.getReadBuffer().clear();
      }
    }
  }

  InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
    return reconstructor.getSocketAddress4Transfer(dnInfo);
  }

  CachingStrategy getCachingStrategy() {
    return reconstructor.getCachingStrategy();
  }

  /**
   * Return the xmits of this EC reconstruction task.
   * <p>
   * DN uses it to coordinate with NN to adjust the speed of scheduling the
   * EC reconstruction tasks to this DN.
   *
   * @return the xmits of this reconstruction task.
   */
  int getXmits() {
    return xmits;
  }

  public int getMinRequiredSources() {
    return minRequiredSources;
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop ErasureCodingWorker 源码

hadoop StripedBlockChecksumCompositeCrcReconstructor 源码

hadoop StripedBlockChecksumMd5CrcReconstructor 源码

hadoop StripedBlockChecksumReconstructor 源码

hadoop StripedBlockReader 源码

hadoop StripedBlockReconstructor 源码

hadoop StripedBlockWriter 源码

hadoop StripedReconstructionInfo 源码

hadoop StripedReconstructor 源码

hadoop StripedWriter 源码

0  赞