hadoop BlockReportLeaseManager 源码

  • 2022-10-20
  • 浏览 (246)

haddop BlockReportLeaseManager 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.concurrent.ThreadLocalRandom;

/**
 * The BlockReportLeaseManager manages block report leases.<p/>
 *
 * DataNodes request BR leases from the NameNode by sending a heartbeat with
 * the requestBlockReportLease field set.  The NameNode may choose to respond
 * with a non-zero lease ID.  If so, that DataNode can send a block report with
 * the given lease ID for the next few minutes.  The NameNode will accept
 * these full block reports.<p/>
 *
 * BR leases limit the number of incoming full block reports to the NameNode
 * at any given time.  For compatibility reasons, the NN will always accept
 * block reports sent with a lease ID of 0 and queue them for processing
 * immediately.  Full block reports which were manually triggered will also
 * have a lease ID of 0, bypassing the rate-limiting.<p/>
 *
 * Block report leases expire after a certain amount of time.  This mechanism
 * is in place so that a DN which dies while holding a lease does not
 * permanently decrease the number of concurrent block reports which the NN is
 * willing to accept.<p/>
 *
 * When considering which DNs to grant a BR lease, the NameNode gives priority
 * to the DNs which have gone the longest without sending a full block
 * report.<p/>
 */
class BlockReportLeaseManager {
  static final Logger LOG =
      LoggerFactory.getLogger(BlockReportLeaseManager.class);

  private static class NodeData {
    /**
     * The UUID of the datanode.
     */
    final String datanodeUuid;

    /**
     * The lease ID, or 0 if there is no lease.
     */
    long leaseId;

    /**
     * The time when the lease was issued, or 0 if there is no lease.
     */
    long leaseTimeMs;

    /**
     * Previous element in the list.
     */
    NodeData prev;

    /**
     * Next element in the list.
     */
    NodeData next;

    static NodeData ListHead(String name) {
      NodeData node = new NodeData(name);
      node.next = node;
      node.prev = node;
      return node;
    }

    NodeData(String datanodeUuid) {
      this.datanodeUuid = datanodeUuid;
    }

    void removeSelf() {
      if (this.prev != null) {
        this.prev.next = this.next;
      }
      if (this.next != null) {
        this.next.prev = this.prev;
      }
      this.next = null;
      this.prev = null;
    }

    void addToEnd(NodeData node) {
      Preconditions.checkState(node.next == null);
      Preconditions.checkState(node.prev == null);
      node.prev = this.prev;
      node.next = this;
      this.prev.next = node;
      this.prev = node;
    }

    void addToBeginning(NodeData node) {
      Preconditions.checkState(node.next == null);
      Preconditions.checkState(node.prev == null);
      node.next = this.next;
      node.prev = this;
      this.next.prev = node;
      this.next = node;
    }
  }

  /**
   * List of datanodes which don't currently have block report leases.
   */
  private final NodeData deferredHead = NodeData.ListHead("deferredHead");

  /**
   * List of datanodes which currently have block report leases.
   */
  private final NodeData pendingHead = NodeData.ListHead("pendingHead");

  /**
   * Maps datanode UUIDs to NodeData.
   */
  private final HashMap<String, NodeData> nodes = new HashMap<>();

  /**
   * The current length of the pending list.
   */
  private int numPending = 0;

  /**
   * The maximum number of leases to hand out at any given time.
   */
  private final int maxPending;

  /**
   * The number of milliseconds after which a lease will expire.
   */
  private final long leaseExpiryMs;

  /**
   * The next ID we will use for a block report lease.
   */
  private long nextId = ThreadLocalRandom.current().nextLong();

  BlockReportLeaseManager(Configuration conf) {
    this(conf.getInt(
          DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES,
          DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT),
        conf.getLong(
          DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
          DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT));
  }

  BlockReportLeaseManager(int maxPending, long leaseExpiryMs) {
    Preconditions.checkArgument(maxPending >= 1,
        "Cannot set the maximum number of block report leases to a " +
            "value less than 1.");
    this.maxPending = maxPending;
    Preconditions.checkArgument(leaseExpiryMs >= 1,
        "Cannot set full block report lease expiry period to a value " +
         "less than 1.");
    this.leaseExpiryMs = leaseExpiryMs;
  }

  /**
   * Get the next block report lease ID.  Any number is valid except 0.
   */
  private long getNextId() {
    return ++nextId == 0L ? ++nextId : nextId;
  }

  public synchronized void register(DatanodeDescriptor dn) {
    registerNode(dn);
  }

  private synchronized NodeData registerNode(DatanodeDescriptor dn) {
    if (nodes.containsKey(dn.getDatanodeUuid())) {
      LOG.info("Can't register DN {} ({}) because it is already registered.",
          dn.getDatanodeUuid(), dn.getXferAddr());
      return null;
    }
    NodeData node = new NodeData(dn.getDatanodeUuid());
    deferredHead.addToBeginning(node);
    nodes.put(dn.getDatanodeUuid(), node);
    LOG.info("Registered DN {} ({}).", dn.getDatanodeUuid(), dn.getXferAddr());
    return node;
  }

  private synchronized void remove(NodeData node) {
    if (node.leaseId != 0) {
      numPending--;
      node.leaseId = 0;
      node.leaseTimeMs = 0;
    }
    node.removeSelf();
  }

  public synchronized void unregister(DatanodeDescriptor dn) {
    NodeData node = nodes.remove(dn.getDatanodeUuid());
    if (node == null) {
      LOG.info("Can't unregister DN {} ({}) because it is not currently " +
          "registered.", dn.getDatanodeUuid(), dn.getXferAddr());
      return;
    }
    remove(node);
  }

  public synchronized long requestLease(DatanodeDescriptor dn) {
    NodeData node = nodes.get(dn.getDatanodeUuid());
    if (node == null) {
      LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " +
          "registered. Registering now.", dn.getDatanodeUuid(),
          dn.getXferAddr());
      node = registerNode(dn);
    }
    if (node.leaseId != 0) {
      // The DataNode wants a new lease, even though it already has one.
      // This can happen if the DataNode is restarted in between requesting
      // a lease and using it.
      LOG.debug("Removing existing BR lease 0x{} for DN {} ({}) in order to " +
               "issue a new one.", Long.toHexString(node.leaseId),
               dn.getDatanodeUuid(), dn.getXferAddr());
    }
    remove(node);
    long monotonicNowMs = Time.monotonicNow();
    pruneExpiredPending(monotonicNowMs);
    if (numPending >= maxPending) {
      if (LOG.isDebugEnabled()) {
        StringBuilder allLeases = new StringBuilder();
        String prefix = "";
        for (NodeData cur = pendingHead.next; cur != pendingHead;
             cur = cur.next) {
          allLeases.append(prefix).append(cur.datanodeUuid);
          prefix = ", ";
        }
        LOG.debug("Can't create a new BR lease for DN {} ({}), because " +
              "numPending equals maxPending at {}. Current leases: {}",
              dn.getDatanodeUuid(), dn.getXferAddr(), numPending, allLeases);
      }
      return 0;
    }
    numPending++;
    node.leaseId = getNextId();
    node.leaseTimeMs = monotonicNowMs;
    pendingHead.addToEnd(node);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Created a new BR lease 0x{} for DN {} ({}). numPending = {}",
          Long.toHexString(node.leaseId), dn.getDatanodeUuid(), dn.getXferAddr(), numPending);
    }
    return node.leaseId;
  }

  private synchronized boolean pruneIfExpired(long monotonicNowMs,
                                              NodeData node) {
    if (monotonicNowMs < node.leaseTimeMs + leaseExpiryMs) {
      return false;
    }
    LOG.info("Removing expired block report lease 0x{} for DN {}.",
        Long.toHexString(node.leaseId), node.datanodeUuid);
    Preconditions.checkState(node.leaseId != 0);
    remove(node);
    deferredHead.addToBeginning(node);
    return true;
  }

  private synchronized void pruneExpiredPending(long monotonicNowMs) {
    NodeData cur = pendingHead.next;
    while (cur != pendingHead) {
      NodeData next = cur.next;
      if (!pruneIfExpired(monotonicNowMs, cur)) {
        return;
      }
      cur = next;
    }
    LOG.trace("No entries remaining in the pending list.");
  }

  public synchronized boolean checkLease(DatanodeDescriptor dn,
                                         long monotonicNowMs, long id) {
    if (id == 0) {
      LOG.debug("Datanode {} ({}) is using BR lease id 0x0 to bypass " +
          "rate-limiting.", dn.getDatanodeUuid(), dn.getXferAddr());
      return true;
    }
    NodeData node = nodes.get(dn.getDatanodeUuid());
    if (node == null) {
      LOG.info("BR lease 0x{} is not valid for unknown datanode {} ({})",
          Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
      return false;
    }
    if (node.leaseId == 0) {
      LOG.warn("BR lease 0x{} is not valid for DN {} ({}), because the DN " +
               "is not in the pending set.",
               Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
      return false;
    }
    if (pruneIfExpired(monotonicNowMs, node)) {
      LOG.warn("BR lease 0x{} is not valid for DN {} ({}), because the lease " +
          "has expired.", Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
      return false;
    }
    if (id != node.leaseId) {
      LOG.warn("BR lease 0x{} is not valid for DN {} ({}). Expected BR lease 0x{}.",
          Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr(),
          Long.toHexString(node.leaseId));
      return false;
    }
    if (LOG.isTraceEnabled()) {
      LOG.trace("BR lease 0x{} is valid for DN {} ({}).",
          Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr());
    }
    return true;
  }

  public synchronized long removeLease(DatanodeDescriptor dn) {
    NodeData node = nodes.get(dn.getDatanodeUuid());
    if (node == null) {
      LOG.info("Can't remove lease for unknown datanode {} ({})",
          dn.getDatanodeUuid(), dn.getXferAddr());
      return 0;
    }
    long id = node.leaseId;
    if (id == 0) {
      LOG.debug("DN {} ({}) has no lease to remove.", dn.getDatanodeUuid(), dn.getXferAddr());
      return 0;
    }
    remove(node);
    deferredHead.addToEnd(node);
    if (LOG.isTraceEnabled()) {
      LOG.trace("Removed BR lease 0x{} for DN {} ({}). numPending = {}",
          Long.toHexString(id), dn.getDatanodeUuid(), dn.getXferAddr(), numPending);
    }
    return id;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AvailableSpaceBlockPlacementPolicy 源码

hadoop AvailableSpaceRackFaultTolerantBlockPlacementPolicy 源码

hadoop BlockCollection 源码

hadoop BlockIdManager 源码

hadoop BlockInfo 源码

hadoop BlockInfoContiguous 源码

hadoop BlockInfoStriped 源码

hadoop BlockManager 源码

hadoop BlockManagerFaultInjector 源码

hadoop BlockManagerSafeMode 源码

0  赞