hadoop ReplicaInputStreams 源码

  • 2022-10-20
  • 浏览 (244)

haddop ReplicaInputStreams 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;

import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;

/**
 * Contains the input streams for the data and checksum of a replica.
 */
public class ReplicaInputStreams implements Closeable {
  public static final Logger LOG = DataNode.LOG;

  private InputStream dataIn;
  private InputStream checksumIn;
  private FsVolumeReference volumeRef;
  private final FileIoProvider fileIoProvider;
  private FileDescriptor dataInFd = null;

  /** Create an object with a data input stream and a checksum input stream. */
  public ReplicaInputStreams(
      InputStream dataStream, InputStream checksumStream,
      FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
    this.volumeRef = volumeRef;
    this.fileIoProvider = fileIoProvider;
    this.dataIn = dataStream;
    this.checksumIn = checksumStream;
    if (dataIn instanceof FileInputStream) {
      try {
        dataInFd = ((FileInputStream) dataIn).getFD();
      } catch (Exception e) {
        LOG.warn("Could not get file descriptor for inputstream of class " +
            this.dataIn.getClass());
      }
    } else {
      LOG.debug("Could not get file descriptor for inputstream of class " +
          this.dataIn.getClass());
    }
  }

  /** @return the data input stream. */
  public InputStream getDataIn() {
    return dataIn;
  }

  /** @return the checksum input stream. */
  public InputStream getChecksumIn() {
    return checksumIn;
  }

  public FileDescriptor getDataInFd() {
    return dataInFd;
  }

  public FsVolumeReference getVolumeRef() {
    return volumeRef;
  }

  public void readDataFully(byte[] buf, int off, int len)
      throws IOException {
    IOUtils.readFully(dataIn, buf, off, len);
  }

  public void readChecksumFully(byte[] buf, int off, int len)
      throws IOException {
    IOUtils.readFully(checksumIn, buf, off, len);
  }

  public void skipDataFully(long len) throws IOException {
    IOUtils.skipFully(dataIn, len);
  }

  public void skipChecksumFully(long len) throws IOException {
    IOUtils.skipFully(checksumIn, len);
  }

  public void closeChecksumStream() throws IOException {
    IOUtils.closeStream(checksumIn);
    checksumIn = null;
  }

  public void dropCacheBehindReads(String identifier, long offset, long len,
      int flags) throws NativeIOException {
    assert this.dataInFd != null : "null dataInFd!";
    fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
        identifier, dataInFd, offset, len, flags);
  }

  public void closeStreams() throws IOException {
    IOException ioe = null;
    if(checksumIn!=null) {
      try {
        checksumIn.close(); // close checksum file
      } catch (IOException e) {
        ioe = e;
      }
      checksumIn = null;
    }
    if(dataIn!=null) {
      try {
        dataIn.close(); // close data file
      } catch (IOException e) {
        ioe = e;
      }
      dataIn = null;
      dataInFd = null;
    }
    if (volumeRef != null) {
      IOUtils.cleanupWithLogger(null, volumeRef);
      volumeRef = null;
    }
    // throw IOException if there is any
    if(ioe!= null) {
      throw ioe;
    }
  }

  @Override
  public void close() {
    IOUtils.closeStream(dataIn);
    dataIn = null;
    dataInFd = null;
    IOUtils.closeStream(checksumIn);
    checksumIn = null;
    IOUtils.cleanupWithLogger(null, volumeRef);
    volumeRef = null;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvailableSpaceVolumeChoosingPolicy 源码

hadoop DataNodeVolumeMetrics 源码

hadoop FsDatasetSpi 源码

hadoop FsVolumeReference 源码

hadoop FsVolumeSpi 源码

hadoop LengthInputStream 源码

hadoop ReplicaOutputStreams 源码

hadoop RoundRobinVolumeChoosingPolicy 源码

hadoop VolumeChoosingPolicy 源码

0  赞