hadoop DFSInotifyEventInputStream 源码

  • 2022-10-20
  • 浏览 (228)

haddop DFSInotifyEventInputStream 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs;

import java.util.Collections;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Stream for reading inotify events. DFSInotifyEventInputStreams should not
 * be shared among multiple threads.
 */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class DFSInotifyEventInputStream {
  public static final Logger LOG = LoggerFactory.getLogger(
      DFSInotifyEventInputStream.class);

  private final ClientProtocol namenode;
  private Iterator<EventBatch> it;
  private long lastReadTxid;
  /**
   * The most recent txid the NameNode told us it has sync'ed -- helps us
   * determine how far behind we are in the edit stream.
   */
  private long syncTxid;
  /**
   * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
   */
  private Random rng = new Random();

  private final Tracer tracer;

  private static final int INITIAL_WAIT_MS = 10;

  DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer)
        throws IOException {
    // Only consider new transaction IDs.
    this(namenode, tracer, namenode.getCurrentEditLogTxid());
  }

  DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer,
      long lastReadTxid) {
    this.namenode = namenode;
    this.it = Collections.emptyIterator();
    this.lastReadTxid = lastReadTxid;
    this.tracer = tracer;
  }

  /**
   * Returns the next batch of events in the stream or null if no new
   * batches are currently available.
   *
   * @throws IOException because of network error or edit log
   * corruption. Also possible if JournalNodes are unresponsive in the
   * QJM setting (even one unresponsive JournalNode is enough in rare cases),
   * so catching this exception and retrying at least a few times is
   * recommended.
   * @throws MissingEventsException if we cannot return the next batch in the
   * stream because the data for the events (and possibly some subsequent
   * events) has been deleted (generally because this stream is a very large
   * number of transactions behind the current state of the NameNode). It is
   * safe to continue reading from the stream after this exception is thrown
   * The next available batch of events will be returned.
   */
  public EventBatch poll() throws IOException, MissingEventsException {
    try (TraceScope ignored = tracer.newScope("inotifyPoll")) {
      // need to keep retrying until the NN sends us the latest committed txid
      if (lastReadTxid == -1) {
        LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
        lastReadTxid = namenode.getCurrentEditLogTxid();
        return null;
      }
      if (!it.hasNext()) {
        EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
        if (el.getLastTxid() != -1) {
          // we only want to set syncTxid when we were actually able to read some
          // edits on the NN -- otherwise it will seem like edits are being
          // generated faster than we can read them when the problem is really
          // that we are temporarily unable to read edits
          syncTxid = el.getSyncTxid();
          it = el.getBatches().iterator();
          long formerLastReadTxid = lastReadTxid;
          lastReadTxid = el.getLastTxid();
          if (el.getFirstTxid() != formerLastReadTxid + 1) {
            throw new MissingEventsException(formerLastReadTxid + 1,
                el.getFirstTxid());
          }
        } else {
          LOG.debug("poll(): read no edits from the NN when requesting edits " +
              "after txid {}", lastReadTxid);
          return null;
        }
      }

      if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
        // newly seen edit log ops actually got converted to events
        return it.next();
      } else {
        return null;
      }
    }
  }

  /**
   * Return a estimate of how many transaction IDs behind the NameNode's
   * current state this stream is. Clients should periodically call this method
   * and check if its result is steadily increasing, which indicates that they
   * are falling behind (i.e. transaction are being generated faster than the
   * client is reading them). If a client falls too far behind events may be
   * deleted before the client can read them.
   * <p>
   * A return value of -1 indicates that an estimate could not be produced, and
   * should be ignored. The value returned by this method is really only useful
   * when compared to previous or subsequent returned values.
   */
  public long getTxidsBehindEstimate() {
    if (syncTxid == 0) {
      return -1;
    } else {
      assert syncTxid >= lastReadTxid;
      // this gives the difference between the last txid we have fetched to the
      // client and syncTxid at the time we last fetched events from the
      // NameNode
      return syncTxid - lastReadTxid;
    }
  }

  /**
   * Returns the next event batch in the stream, waiting up to the specified
   * amount of time for a new batch. Returns null if one is not available at the
   * end of the specified amount of time. The time before the method returns may
   * exceed the specified amount of time by up to the time required for an RPC
   * to the NameNode.
   *
   * @param time number of units of the given TimeUnit to wait
   * @param tu the desired TimeUnit
   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
   * @throws MissingEventsException
   * see {@link DFSInotifyEventInputStream#poll()}
   * @throws InterruptedException if the calling thread is interrupted
   */
  public EventBatch poll(long time, TimeUnit tu) throws IOException,
      InterruptedException, MissingEventsException {
    EventBatch next;
    try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) {
      long initialTime = Time.monotonicNow();
      long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
      long nextWait = INITIAL_WAIT_MS;
      while ((next = poll()) == null) {
        long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
        if (timeLeft <= 0) {
          LOG.debug("timed poll(): timed out");
          break;
        } else if (timeLeft < nextWait * 2) {
          nextWait = timeLeft;
        } else {
          nextWait *= 2;
        }
        LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
            nextWait);
        Thread.sleep(nextWait);
      }
    }
    return next;
  }

  /**
   * Returns the next batch of events in the stream, waiting indefinitely if
   * a new batch  is not immediately available.
   *
   * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
   * @throws MissingEventsException see
   * {@link DFSInotifyEventInputStream#poll()}
   * @throws InterruptedException if the calling thread is interrupted
   */
  public EventBatch take() throws IOException, InterruptedException,
      MissingEventsException {
    EventBatch next;
    try (TraceScope ignored = tracer.newScope("inotifyTake")) {
      int nextWaitMin = INITIAL_WAIT_MS;
      while ((next = poll()) == null) {
        // sleep for a random period between nextWaitMin and nextWaitMin * 2
        // to avoid stampedes at the NN if there are multiple clients
        int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
        LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
        Thread.sleep(sleepTime);
        // the maximum sleep is 2 minutes
        nextWaitMin = Math.min(60000, nextWaitMin * 2);
      }
    }

    return next;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AddBlockFlag 源码

hadoop BlockMissingException 源码

hadoop BlockReader 源码

hadoop CannotObtainBlockLengthException 源码

hadoop ClientContext 源码

hadoop ClientGSIContext 源码

hadoop DFSClient 源码

hadoop DFSClientFaultInjector 源码

hadoop DFSHedgedReadMetrics 源码

hadoop DFSInputStream 源码

0  赞