hadoop ClientContext 源码

  • 2022-10-20
  • 浏览 (242)

haddop ClientContext 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.ReflectionUtils;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ClientContext contains context information for a client.
 *
 * This allows us to share caches such as the socket cache across
 * DFSClient instances.
 */
@InterfaceAudience.Private
public class ClientContext {
  private static final Logger LOG = LoggerFactory.getLogger(
      ClientContext.class);

  /**
   * Global map of context names to caches contexts.
   */
  private final static HashMap<String, ClientContext> CACHES = new HashMap<>();

  /**
   * Name of context.
   */
  private final String name;

  /**
   * The client conf used to initialize context.
   */
  private final DfsClientConf dfsClientConf;

  /**
   * String representation of the configuration.
   */
  private final String confString;

  /**
   * Caches short-circuit file descriptors, mmap regions.
   */
  private final ShortCircuitCache[] shortCircuitCache;

  /**
   * Caches TCP and UNIX domain sockets for reuse.
   */
  private final PeerCache peerCache;

  /**
   * Stores information about socket paths.
   */
  private final DomainSocketFactory domainSocketFactory;

  /**
   * Caches key Providers for the DFSClient
   */
  private final KeyProviderCache keyProviderCache;
  /**
   * True if we should use the legacy BlockReaderLocal.
   */
  private final boolean useLegacyBlockReaderLocal;

  /**
   * True if the legacy BlockReaderLocal is disabled.
   *
   * The legacy block reader local gets disabled completely whenever there is an
   * error or miscommunication.  The new block reader local code handles this
   * case more gracefully inside DomainSocketFactory.
   */
  private volatile boolean disableLegacyBlockReaderLocal = false;

  /** Creating byte[] for {@link DFSOutputStream}. */
  private final ByteArrayManager byteArrayManager;

  /**
   * Whether or not we complained about a DFSClient fetching a CacheContext that
   * didn't match its config values yet.
   */
  private boolean printedConfWarning = false;

  private NodeBase clientNode;
  private boolean topologyResolutionEnabled;

  /**
   * The switch to DeadNodeDetector.
   */
  private boolean deadNodeDetectionEnabled = false;

  /**
   * Detect the dead datanodes in advance, and share this information among all
   * the DFSInputStreams in the same client.
   */
  private volatile DeadNodeDetector deadNodeDetector = null;

  /**
   * The switch for the {@link LocatedBlocksRefresher}.
   */
  private final boolean locatedBlocksRefresherEnabled;

  /**
   * Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
   * registered {@link DFSInputStream}s, to take advantage of changes in block placement.
   */
  private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;

  /**
   * Count the reference of ClientContext.
   */
  private int counter = 0;

  /**
   * ShortCircuitCache array size.
   */
  private final int clientShortCircuitNum;
  private Configuration configuration;

  private ClientContext(String name, DfsClientConf conf,
      Configuration config) {
    final ShortCircuitConf scConf = conf.getShortCircuitConf();

    this.name = name;
    this.dfsClientConf = conf;
    this.confString = scConf.confAsString();
    this.clientShortCircuitNum = conf.getClientShortCircuitNum();
    this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
    for (int i = 0; i < this.clientShortCircuitNum; i++) {
      this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
    }

    this.configuration = config;
    this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
        scConf.getSocketCacheExpiry());
    this.keyProviderCache = new KeyProviderCache(
        scConf.getKeyProviderCacheExpiryMs());
    this.useLegacyBlockReaderLocal = scConf.isUseLegacyBlockReaderLocal();
    this.domainSocketFactory = new DomainSocketFactory(scConf);

    this.byteArrayManager = ByteArrayManager.newInstance(
        conf.getWriteByteArrayManagerConf());
    this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
    this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
    initTopologyResolution(config);
  }

  private void initTopologyResolution(Configuration config) {
    topologyResolutionEnabled = config.getBoolean(
        FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED,
        FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT);
    if (!topologyResolutionEnabled) {
      return;
    }
    DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
        config.getClass(
            CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
            ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
    String clientHostName = NetUtils.getLocalHostname();
    List<String> nodes = new ArrayList<>();
    nodes.add(clientHostName);
    List<String> resolvedHosts = dnsToSwitchMapping.resolve(nodes);
    if (resolvedHosts != null && !resolvedHosts.isEmpty() &&
        !resolvedHosts.get(0).equals(NetworkTopology.DEFAULT_RACK)) {
      // The client machine is able to resolve its own network location.
      this.clientNode = new NodeBase(clientHostName, resolvedHosts.get(0));
    }
  }

  public static ClientContext get(String name, DfsClientConf conf,
      Configuration config) {
    ClientContext context;
    synchronized(ClientContext.class) {
      context = CACHES.get(name);
      if (context == null) {
        context = new ClientContext(name, conf, config);
        CACHES.put(name, context);
      } else {
        context.printConfWarningIfNeeded(conf);
      }
    }
    context.reference();
    return context;
  }

  public static ClientContext get(String name, Configuration config) {
    return get(name, new DfsClientConf(config), config);
  }

  /**
   * Get a client context, from a Configuration object.
   *
   * This method is less efficient than the version which takes a DFSClient#Conf
   * object, and should be mostly used by tests.
   */
  @VisibleForTesting
  public static ClientContext getFromConf(Configuration conf) {
    return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), conf);
  }

  private void printConfWarningIfNeeded(DfsClientConf conf) {
    String existing = this.getConfString();
    String requested = conf.getShortCircuitConf().confAsString();
    if (!existing.equals(requested)) {
      if (!printedConfWarning) {
        printedConfWarning = true;
        LOG.warn("Existing client context '" + name + "' does not match " +
            "requested configuration.  Existing: " + existing +
            ", Requested: " + requested);
      }
    }
  }

  public String getConfString() {
    return confString;
  }

  public ShortCircuitCache getShortCircuitCache() {
    return shortCircuitCache[0];
  }

  public ShortCircuitCache getShortCircuitCache(long idx) {
    return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
  }

  public PeerCache getPeerCache() {
    return peerCache;
  }

  public KeyProviderCache getKeyProviderCache() {
    return keyProviderCache;
  }

  public boolean getUseLegacyBlockReaderLocal() {
    return useLegacyBlockReaderLocal;
  }

  public boolean getDisableLegacyBlockReaderLocal() {
    return disableLegacyBlockReaderLocal;
  }

  public void setDisableLegacyBlockReaderLocal() {
    disableLegacyBlockReaderLocal = true;
  }

  public DomainSocketFactory getDomainSocketFactory() {
    return domainSocketFactory;
  }

  public ByteArrayManager getByteArrayManager() {
    return byteArrayManager;
  }

  public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException {
    // If applications disable the feature or the client machine can't
    // resolve its network location, clientNode will be set to null.
    if (clientNode == null) {
      return DFSUtilClient.isLocalAddress(NetUtils.
          createSocketAddr(datanodeInfo.getXferAddr())) ? 0 :
          Integer.MAX_VALUE;
    }
    NodeBase node = new NodeBase(datanodeInfo.getHostName(),
        datanodeInfo.getNetworkLocation());
    return NetworkTopology.getDistanceByPath(clientNode, node);
  }

  /**
   * The switch to DeadNodeDetector. If true, DeadNodeDetector is available.
   */
  public boolean isDeadNodeDetectionEnabled() {
    return deadNodeDetectionEnabled;
  }

  /**
   * Obtain DeadNodeDetector of the current client.
   */
  public DeadNodeDetector getDeadNodeDetector() {
    return deadNodeDetector;
  }

  /**
   * If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
   * of registered DFSInputStreams.
   */
  public boolean isLocatedBlocksRefresherEnabled() {
    return locatedBlocksRefresherEnabled;
  }

  /**
   * Obtain LocatedBlocksRefresher of the current client.
   */
  public LocatedBlocksRefresher getLocatedBlocksRefresher() {
    return locatedBlocksRefresher;
  }

  /**
   * Increment the counter. Start the dead node detector thread if there is no
   * reference.
   */
  synchronized void reference() {
    counter++;
    if (deadNodeDetectionEnabled && deadNodeDetector == null) {
      deadNodeDetector = new DeadNodeDetector(name, configuration);
      deadNodeDetector.start();
    }
    if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
      locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
      locatedBlocksRefresher.start();
    }
  }

  /**
   * Decrement the counter. Close the dead node detector thread if there is no
   * reference.
   */
  synchronized void unreference() {
    Preconditions.checkState(counter > 0);
    counter--;
    if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
      deadNodeDetector.shutdown();
      deadNodeDetector = null;
    }

    if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
      locatedBlocksRefresher.shutdown();
      locatedBlocksRefresher = null;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AddBlockFlag 源码

hadoop BlockMissingException 源码

hadoop BlockReader 源码

hadoop CannotObtainBlockLengthException 源码

hadoop ClientGSIContext 源码

hadoop DFSClient 源码

hadoop DFSClientFaultInjector 源码

hadoop DFSHedgedReadMetrics 源码

hadoop DFSInotifyEventInputStream 源码

hadoop DFSInputStream 源码

0  赞