hadoop StripedBlockReader 源码

  • 2022-10-20
  • 浏览 (271)

haddop StripedBlockReader 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.concurrent.Callable;

/**
 * StripedBlockReader is used to read block data from one source DN, it contains
 * a block reader, read buffer and striped block index.
 * Only allocate StripedBlockReader once for one source, and the StripedReader
 * has the same array order with sources. Typically we only need to allocate
 * minimum number (minRequiredSources) of StripedReader, and allocate
 * new for new source DN if some existing DN invalid or slow.
 * If some source DN is corrupt, set the corresponding blockReader to
 * null and will never read from it again.
 */
@InterfaceAudience.Private
class StripedBlockReader {
  private static final Logger LOG = DataNode.LOG;

  private StripedReader stripedReader;
  private final DataNode datanode;
  private final Configuration conf;

  private final short index; // internal block index
  private final ExtendedBlock block;
  private final DatanodeInfo source;
  private BlockReader blockReader;
  private ByteBuffer buffer;
  private boolean isLocal;

  StripedBlockReader(StripedReader stripedReader, DataNode datanode,
                     Configuration conf, short index, ExtendedBlock block,
                     DatanodeInfo source, long offsetInBlock) {
    this.stripedReader = stripedReader;
    this.datanode = datanode;
    this.conf = conf;

    this.index = index;
    this.source = source;
    this.block = block;
    this.isLocal = false;

    BlockReader tmpBlockReader = createBlockReader(offsetInBlock);
    if (tmpBlockReader != null) {
      this.blockReader = tmpBlockReader;
    }
  }

  ByteBuffer getReadBuffer() {
    if (buffer == null) {
      this.buffer = stripedReader.allocateReadBuffer();
    }
    return buffer;
  }

  void freeReadBuffer() {
    DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
    buffer = null;
  }

  void resetBlockReader(long offsetInBlock) {
    this.blockReader = createBlockReader(offsetInBlock);
  }

  private BlockReader createBlockReader(long offsetInBlock) {
    if (offsetInBlock >= block.getNumBytes()) {
      return null;
    }
    Peer peer = null;
    try {
      InetSocketAddress dnAddr =
          stripedReader.getSocketAddress4Transfer(source);
      Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
          block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
          StorageType.EMPTY_ARRAY, new String[0]);
        /*
         * This can be further improved if the replica is local, then we can
         * read directly from DN and need to check the replica is FINALIZED
         * state, notice we should not use short-circuit local read which
         * requires config for domain-socket in UNIX or legacy config in
         * Windows. The network distance value isn't used for this scenario.
         *
         * TODO: add proper tracer
         */
      peer = newConnectedPeer(block, dnAddr, blockToken, source);
      if (peer.isLocal()) {
        this.isLocal = true;
      }
      return BlockReaderRemote.newBlockReader(
          "dummy", block, blockToken, offsetInBlock,
          block.getNumBytes() - offsetInBlock, true, "", peer, source,
          null, stripedReader.getCachingStrategy(), -1, conf);
    } catch (IOException e) {
      LOG.info("Exception while creating remote block reader, datanode {}",
          source, e);
      IOUtils.closeStream(peer);
      return null;
    }
  }

  private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
                                Token<BlockTokenIdentifier> blockToken,
                                DatanodeID datanodeId)
      throws IOException {
    Peer peer = null;
    boolean success = false;
    Socket sock = null;
    final int socketTimeout = datanode.getDnConf().getSocketTimeout();
    try {
      sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
      NetUtils.connect(sock, addr, socketTimeout);
      peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
          sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
          blockToken, datanodeId, socketTimeout);
      success = true;
      return peer;
    } finally {
      if (!success) {
        IOUtils.cleanupWithLogger(null, peer);
        IOUtils.closeSocket(sock);
      }
    }
  }

  Callable<BlockReadStats> readFromBlock(final int length,
                               final CorruptedBlocks corruptedBlocks) {
    return new Callable<BlockReadStats>() {

      @Override
      public BlockReadStats call() throws Exception {
        try {
          getReadBuffer().limit(length);
          return actualReadFromBlock();
        } catch (ChecksumException e) {
          LOG.warn("Found Checksum error for {} from {} at {}", block,
              source, e.getPos());
          corruptedBlocks.addCorruptedBlock(block, source);
          throw e;
        } catch (IOException e) {
          LOG.info(e.getMessage());
          throw e;
        } finally {
          DataNodeFaultInjector.get().interceptBlockReader();
        }
      }
    };
  }

  /**
   * Perform actual reading of bytes from block.
   */
  private BlockReadStats actualReadFromBlock() throws IOException {
    DataNodeFaultInjector.get().delayBlockReader();
    int len = buffer.remaining();
    int n = 0;
    while (n < len) {
      int nread = blockReader.read(buffer);
      if (nread <= 0) {
        break;
      }
      n += nread;
      stripedReader.getReconstructor().incrBytesRead(isLocal, nread);
    }
    return new BlockReadStats(n, blockReader.isShortCircuit(),
        blockReader.getNetworkDistance());
  }

  // close block reader
  void closeBlockReader() {
    IOUtils.closeStream(blockReader);
    blockReader = null;
  }

  short getIndex() {
    return index;
  }

  BlockReader getBlockReader() {
    return blockReader;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ErasureCodingWorker 源码

hadoop StripedBlockChecksumCompositeCrcReconstructor 源码

hadoop StripedBlockChecksumMd5CrcReconstructor 源码

hadoop StripedBlockChecksumReconstructor 源码

hadoop StripedBlockReconstructor 源码

hadoop StripedBlockWriter 源码

hadoop StripedReader 源码

hadoop StripedReconstructionInfo 源码

hadoop StripedReconstructor 源码

hadoop StripedWriter 源码

0  赞