hadoop LeaseRenewer 源码

  • 2022-10-20
  • 浏览 (234)

haddop LeaseRenewer 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.client.impl;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing
 * file-being-written leases on the namenode.
 * When a file is opened for write (create or append),
 * namenode stores a file lease for recording the identity of the writer.
 * The writer (i.e. the DFSClient) is required to renew the lease periodically.
 * When the lease is not renewed before it expires,
 * the namenode considers the writer as failed and then it may either let
 * another writer to obtain the lease or close the file.
 * </p>
 * <p>
 * This class also provides the following functionality:
 * <ul>
 * <li>
 * It maintains a map from (namenode, user) pairs to lease renewers.
 * The same {@link LeaseRenewer} instance is used for renewing lease
 * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and
 * the same user.
 * </li>
 * <li>
 * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
 * Periodically the leases for all the clients are renewed.
 * A client is removed from the list when the client is closed.
 * </li>
 * <li>
 * A thread per namenode per user is used by the {@link LeaseRenewer}
 * to renew the leases.
 * </li>
 * </ul>
 * <p>
 */
@InterfaceAudience.Private
public class LeaseRenewer {
  public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);

  private static long leaseRenewerGraceDefault = 60*1000L;
  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

  private AtomicBoolean isLSRunning = new AtomicBoolean(false);

  /** Get a {@link LeaseRenewer} instance */
  public static LeaseRenewer getInstance(final String authority,
      final UserGroupInformation ugi, final DFSClient dfsc) {
    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
    r.addClient(dfsc);
    return r;
  }

  /**
   * Remove the given renewer from the Factory.
   * Subsequent call will receive new {@link LeaseRenewer} instance.
   * @param renewer Instance to be cleared from Factory
   */
  public static void remove(LeaseRenewer renewer) {
    synchronized (renewer) {
      Factory.INSTANCE.remove(renewer);
    }
  }

  /**
   * A factory for sharing {@link LeaseRenewer} objects
   * among {@link DFSClient} instances
   * so that there is only one renewer per authority per user.
   */
  private static class Factory {
    private static final Factory INSTANCE = new Factory();

    private static class Key {
      /** Namenode info */
      final String authority;
      /** User info */
      final UserGroupInformation ugi;

      private Key(final String authority, final UserGroupInformation ugi) {
        if (authority == null) {
          throw new HadoopIllegalArgumentException("authority == null");
        } else if (ugi == null) {
          throw new HadoopIllegalArgumentException("ugi == null");
        }

        this.authority = authority;
        this.ugi = ugi;
      }

      @Override
      public int hashCode() {
        return authority.hashCode() ^ ugi.hashCode();
      }

      @Override
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          final Key that = (Key)obj;
          return this.authority.equals(that.authority)
                 && this.ugi.equals(that.ugi);
        }
        return false;
      }

      @Override
      public String toString() {
        return ugi.getShortUserName() + "@" + authority;
      }
    }

    /** A map for per user per namenode renewers. */
    private final Map<Key, LeaseRenewer> renewers = new HashMap<>();

    /** Get a renewer. */
    private synchronized LeaseRenewer get(final String authority,
        final UserGroupInformation ugi) {
      final Key k = new Key(authority, ugi);
      LeaseRenewer r = renewers.get(k);
      if (r == null) {
        r = new LeaseRenewer(k);
        renewers.put(k, r);
      }
      return r;
    }

    /** Remove the given renewer. */
    private synchronized void remove(final LeaseRenewer r) {
      final LeaseRenewer stored = renewers.get(r.factorykey);
      //Since a renewer may expire, the stored renewer can be different.
      if (r == stored) {
        // Expire LeaseRenewer daemon thread as soon as possible.
        r.clearClients();
        r.setEmptyTime(0);
        renewers.remove(r.factorykey);
      }
    }
  }

  /** The time in milliseconds that the map became empty. */
  private long emptyTime = Long.MAX_VALUE;
  /** A fixed lease renewal time period in milliseconds */
  private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2;

  /** A daemon for renewing lease */
  private Daemon daemon = null;
  /** Only the daemon with currentId should run. */
  private int currentId = 0;

  /**
   * A period in milliseconds that the lease renewer thread should run
   * after the map became empty.
   * In other words,
   * if the map is empty for a time period longer than the grace period,
   * the renewer should terminate.
   */
  private long gracePeriod;
  /**
   * The time period in milliseconds
   * that the renewer sleeps for each iteration.
   */
  private long sleepPeriod;

  private final Factory.Key factorykey;

  /** A list of clients corresponding to this renewer. */
  private final List<DFSClient> dfsclients = new ArrayList<>();

  /**
   * A stringified stack trace of the call stack when the Lease Renewer
   * was instantiated. This is only generated if trace-level logging is
   * enabled on this class.
   */
  private final String instantiationTrace;

  private LeaseRenewer(Factory.Key factorykey) {
    this.factorykey = factorykey;
    unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);

    if (LOG.isTraceEnabled()) {
      instantiationTrace = StringUtils.stringifyException(
        new Throwable("TRACE"));
    } else {
      instantiationTrace = null;
    }
  }

  /** @return the renewal time in milliseconds. */
  private synchronized long getRenewalTime() {
    return renewal;
  }

  /** Used for testing only. */
  @VisibleForTesting
  public synchronized void setRenewalTime(final long renewal) {
    this.renewal = renewal;
  }

  /** Add a client. */
  private synchronized void addClient(final DFSClient dfsc) {
    for(DFSClient c : dfsclients) {
      if (c == dfsc) {
        //client already exists, nothing to do.
        return;
      }
    }
    //client not found, add it
    dfsclients.add(dfsc);

    //update renewal time
    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
    if (hdfsTimeout > 0) {
      final long half = hdfsTimeout/2;
      if (half < renewal) {
        this.renewal = half;
      }
    }
  }

  private synchronized void clearClients() {
    dfsclients.clear();
  }

  private synchronized boolean clientsRunning() {
    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
      if (!i.next().isClientRunning()) {
        i.remove();
      }
    }
    return !dfsclients.isEmpty();
  }

  private synchronized long getSleepPeriod() {
    return sleepPeriod;
  }

  /** Set the grace period and adjust the sleep period accordingly. */
  synchronized void setGraceSleepPeriod(final long gracePeriod) {
    unsyncSetGraceSleepPeriod(gracePeriod);
  }

  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
    if (gracePeriod < 100L) {
      throw new HadoopIllegalArgumentException(gracePeriod
          + " = gracePeriod < 100ms is too small.");
    }
    this.gracePeriod = gracePeriod;
    final long half = gracePeriod/2;
    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
        half: LEASE_RENEWER_SLEEP_DEFAULT;
  }

  @VisibleForTesting
  /** Is the daemon running? */
  public synchronized boolean isRunning() {
    return daemon != null && daemon.isAlive();
  }

  /** Does this renewer have nothing to renew? */
  public boolean isEmpty() {
    return dfsclients.isEmpty();
  }

  /** Used only by tests */
  synchronized String getDaemonName() {
    return daemon.getName();
  }

  /** Is the empty period longer than the grace period? */
  private synchronized boolean isRenewerExpired() {
    return emptyTime != Long.MAX_VALUE
        && Time.monotonicNow() - emptyTime > gracePeriod;
  }

  public synchronized boolean put(final DFSClient dfsc) {
    if (dfsc.isClientRunning()) {
      if (!isRunning() || isRenewerExpired()) {
        // Start a new daemon with a new id.
        final int id = ++currentId;
        if (isLSRunning.get()) {
          // Not allowed to add multiple daemons into LeaseRenewer, let client
          // create new LR and continue to acquire lease.
          return false;
        }
        isLSRunning.getAndSet(true);

        daemon = new Daemon(new Runnable() {
          @Override
          public void run() {
            try {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Lease renewer daemon for " + clientsString()
                    + " with renew id " + id + " started");
              }
              LeaseRenewer.this.run(id);
            } catch(InterruptedException e) {
              LOG.debug("LeaseRenewer is interrupted.", e);
            } finally {
              synchronized(LeaseRenewer.this) {
                Factory.INSTANCE.remove(LeaseRenewer.this);
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug("Lease renewer daemon for " + clientsString()
                    + " with renew id " + id + " exited");
              }
            }
          }

          @Override
          public String toString() {
            return String.valueOf(LeaseRenewer.this);
          }
        });
        daemon.start();
      }
      emptyTime = Long.MAX_VALUE;
    }
    return true;
  }

  @VisibleForTesting
  synchronized void setEmptyTime(long time) {
    emptyTime = time;
  }

  /** Close the given client. */
  public synchronized void closeClient(final DFSClient dfsc) {
    dfsclients.remove(dfsc);
    if (dfsclients.isEmpty()) {
      if (!isRunning() || isRenewerExpired()) {
        Factory.INSTANCE.remove(LeaseRenewer.this);
        return;
      }
      if (emptyTime == Long.MAX_VALUE) {
        //discover the first time that the client list is empty.
        emptyTime = Time.monotonicNow();
      }
    }

    //update renewal time
    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
      long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
      for(DFSClient c : dfsclients) {
        final int timeout = c.getConf().getHdfsTimeout();
        if (timeout > 0 && timeout < min) {
          min = timeout;
        }
      }
      renewal = min/2;
    }
  }

  public void interruptAndJoin() throws InterruptedException {
    Daemon daemonCopy = null;
    synchronized (this) {
      if (isRunning()) {
        daemon.interrupt();
        daemonCopy = daemon;
      }
    }

    if (daemonCopy != null) {
      LOG.debug("Wait for lease checker to terminate");
      daemonCopy.join();
    }
  }

  private void renew() throws IOException {
    final List<DFSClient> copies;
    synchronized(this) {
      copies = new ArrayList<>(dfsclients);
    }
    //sort the client names for finding out repeated names.
    Collections.sort(copies, new Comparator<DFSClient>() {
      @Override
      public int compare(final DFSClient left, final DFSClient right) {
        return left.getClientName().compareTo(right.getClientName());
      }
    });
    String previousName = "";
    for (final DFSClient c : copies) {
      //skip if current client name is the same as the previous name.
      if (!c.getClientName().equals(previousName)) {
        if (!c.renewLease()) {
          LOG.debug("Did not renew lease for client {}", c);
          continue;
        }
        previousName = c.getClientName();
        LOG.debug("Lease renewed for client {}", previousName);
      }
    }
  }

  /**
   * Periodically check in with the namenode and renew all the leases
   * when the lease period is half over.
   */
  private void run(final int id) throws InterruptedException {
    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
        Thread.sleep(getSleepPeriod())) {
      final long elapsed = Time.monotonicNow() - lastRenewed;
      if (elapsed >= getRenewalTime()) {
        try {
          renew();
          if (LOG.isDebugEnabled()) {
            LOG.debug("Lease renewer daemon for " + clientsString()
                + " with renew id " + id + " executed");
          }
          lastRenewed = Time.monotonicNow();
        } catch (SocketTimeoutException ie) {
          LOG.warn("Failed to renew lease for " + clientsString() + " for "
              + (elapsed/1000) + " seconds.  Aborting ...", ie);
          List<DFSClient> dfsclientsCopy;
          synchronized (this) {
            DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
            dfsclientsCopy = new ArrayList<>(dfsclients);
            Factory.INSTANCE.remove(LeaseRenewer.this);
          }
          for (DFSClient dfsClient : dfsclientsCopy) {
            dfsClient.closeAllFilesBeingWritten(true);
          }
          break;
        } catch (IOException ie) {
          LOG.warn("Failed to renew lease for " + clientsString() + " for "
              + (elapsed/1000) + " seconds.  Will retry shortly ...", ie);
        }
      }

      synchronized(this) {
        if (id != currentId || isRenewerExpired()) {
          if (LOG.isDebugEnabled()) {
            if (id != currentId) {
              LOG.debug("Lease renewer daemon for " + clientsString()
                  + " with renew id " + id + " is not current");
            } else {
              LOG.debug("Lease renewer daemon for " + clientsString()
                  + " with renew id " + id + " expired");
            }
          }
          //no longer the current daemon or expired
          return;
        }

        // if no clients are in running state or there is no more clients
        // registered with this renewer, stop the daemon after the grace
        // period.
        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
          emptyTime = Time.monotonicNow();
        }
      }
    }
  }

  @Override
  public String toString() {
    String s = getClass().getSimpleName() + ":" + factorykey;
    if (LOG.isTraceEnabled()) {
      return s + ", clients=" +  clientsString()
        + ", created at " + instantiationTrace;
    }
    return s;
  }

  /** Get the names of all clients */
  private synchronized String clientsString() {
    if (dfsclients.isEmpty()) {
      return "[]";
    } else {
      final StringBuilder b = new StringBuilder("[").append(
          dfsclients.get(0).getClientName());
      for(int i = 1; i < dfsclients.size(); i++) {
        b.append(", ").append(dfsclients.get(i).getClientName());
      }
      return b.append("]").toString();
    }
  }

  @VisibleForTesting
  public static void setLeaseRenewerGraceDefault(
      long leaseRenewerGraceDefault) {
    LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockReaderFactory 源码

hadoop BlockReaderLocal 源码

hadoop BlockReaderLocalLegacy 源码

hadoop BlockReaderRemote 源码

hadoop BlockReaderUtil 源码

hadoop CorruptFileBlockIterator 源码

hadoop DfsClientConf 源码

hadoop ExternalBlockReader 源码

hadoop SnapshotDiffReportGenerator 源码

hadoop package-info 源码

0  赞