hadoop RequestHedgingProxyProvider 源码

  • 2022-10-20
  • 浏览 (17)

haddop RequestHedgingProxyProvider 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;

/**
 * A FailoverProxyProvider implementation that technically does not "failover"
 * per-se. It constructs a wrapper proxy that sends the request to ALL
 * underlying proxies simultaneously. It assumes the in an HA setup, there will
 * be only one Active, and the active should respond faster than any configured
 * standbys. Once it receive a response from any one of the configured proxies,
 * outstanding requests to other proxies are immediately cancelled.
 */
public class RequestHedgingProxyProvider<T> extends
        ConfiguredFailoverProxyProvider<T> {

  public static final Logger LOG =
      LoggerFactory.getLogger(RequestHedgingProxyProvider.class);

  class RequestHedgingInvocationHandler implements RpcInvocationHandler {

    final Map<String, ProxyInfo<T>> targetProxies;
    // Proxy of the active nn
    private volatile ProxyInfo<T> currentUsedProxy = null;

    public RequestHedgingInvocationHandler(
            Map<String, ProxyInfo<T>> targetProxies) {
      this.targetProxies = new HashMap<>(targetProxies);
    }

    /**
     * Creates a Executor and invokes all proxies concurrently. This
     * implementation assumes that Clients have configured proper socket
     * timeouts, else the call can block forever.
     *
     * @param proxy
     * @param method
     * @param args
     * @return
     * @throws Throwable
     */
    @Override
    public Object
    invoke(Object proxy, final Method method, final Object[] args)
            throws Throwable {
      // Need double check locking to guarantee thread-safe since
      // currentUsedProxy is lazily initialized.
      if (currentUsedProxy == null) {
        synchronized (this) {
          if (currentUsedProxy == null) {
            Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
            int numAttempts = 0;

            ExecutorService executor = null;
            CompletionService<Object> completionService;
            try {
              // Optimization : if only 2 proxies are configured and one had
              // failed
              // over, then we dont need to create a threadpool etc.
              targetProxies.remove(toIgnore);
              if (targetProxies.size() == 0) {
                LOG.trace("No valid proxies left");
                throw new RemoteException(IOException.class.getName(),
                    "No valid proxies left. "
                        + "All NameNode proxies have failed over.");
              }
              if (targetProxies.size() == 1) {
                ProxyInfo<T> proxyInfo =
                    targetProxies.values().iterator().next();
                try {
                  currentUsedProxy = proxyInfo;
                  Object retVal = method.invoke(proxyInfo.proxy, args);
                  LOG.debug("Invocation successful on [{}]",
                      currentUsedProxy.proxyInfo);
                  return retVal;
                } catch (InvocationTargetException ex) {
                  Exception unwrappedException =
                      unwrapInvocationTargetException(ex);
                  logProxyException(unwrappedException,
                      currentUsedProxy.proxyInfo);
                  LOG.trace("Unsuccessful invocation on [{}]",
                      currentUsedProxy.proxyInfo);
                  throw unwrappedException;
                }
              }
              executor = Executors.newFixedThreadPool(proxies.size());
              completionService = new ExecutorCompletionService<>(executor);
              // Set the callId and other informations from current thread.
              final int callId = Client.getCallId();
              final int retryCount = Client.getRetryCount();
              final Object externalHandler = Client.getExternalHandler();
              for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies
                  .entrySet()) {
                Callable<Object> c = new Callable<Object>() {
                  @Override
                  public Object call() throws Exception {
                    // Call Id and other informations from parent thread.
                    Client.setCallIdAndRetryCount(callId, retryCount,
                        externalHandler);
                    LOG.trace("Invoking method {} on proxy {}", method,
                        pEntry.getValue().proxyInfo);
                    return method.invoke(pEntry.getValue().proxy, args);
                  }
                };
                proxyMap.put(completionService.submit(c), pEntry.getValue());
                numAttempts++;
              }
              // Current thread's callId will not be cleared as RPC happens in
              // separate threads. Reset the CallId information Forcefully.
              Client.setCallIdAndRetryCountUnprotected(null, 0, null);
              Map<String, Exception> badResults = new HashMap<>();
              while (numAttempts > 0) {
                Future<Object> callResultFuture = completionService.take();
                Object retVal;
                try {
                  currentUsedProxy = proxyMap.get(callResultFuture);
                  retVal = callResultFuture.get();
                  LOG.debug("Invocation successful on [{}]",
                      currentUsedProxy.proxyInfo);
                  return retVal;
                } catch (ExecutionException ex) {
                  Exception unwrappedException = unwrapExecutionException(ex);
                  ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
                  logProxyException(unwrappedException, tProxyInfo.proxyInfo);
                  badResults.put(tProxyInfo.proxyInfo, unwrappedException);
                  LOG.trace("Unsuccessful invocation on [{}]",
                      tProxyInfo.proxyInfo);
                  numAttempts--;
                }
              }

              // At this point we should have All bad results (Exceptions)
              // Or should have returned with successful result.
              if (badResults.size() == 1) {
                throw badResults.values().iterator().next();
              } else {
                throw new MultiException(badResults);
              }
            } finally {
              if (executor != null) {
                LOG.trace("Shutting down threadpool executor");
                executor.shutdownNow();
              }
            }
          }
        }
      }
      // Because the above synchronized block will return or throw an exception,
      // so we don't need to do any check to prevent the first initialized
      // thread from stepping to following codes.
      try {
        Object retVal = method.invoke(currentUsedProxy.proxy, args);
        LOG.debug("Invocation successful on [{}]", currentUsedProxy.proxyInfo);
        return retVal;
      } catch (InvocationTargetException ex) {
        Exception unwrappedException = unwrapInvocationTargetException(ex);
        logProxyException(unwrappedException, currentUsedProxy.proxyInfo);
        LOG.trace("Unsuccessful invocation on [{}]",
            currentUsedProxy.proxyInfo);
        throw unwrappedException;
      }
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public ConnectionId getConnectionId() {
      if (currentUsedProxy == null) {
        return null;
      }
      return RPC.getConnectionIdForProxy(currentUsedProxy.proxy);
    }
  }

  /** A proxy wrapping {@link RequestHedgingInvocationHandler}. */
  private ProxyInfo<T> currentUsedHandler = null;
  private volatile String toIgnore = null;

  public RequestHedgingProxyProvider(Configuration conf, URI uri,
      Class<T> xface, HAProxyFactory<T> proxyFactory) {
    super(conf, uri, xface, proxyFactory);
  }

  @SuppressWarnings("unchecked")
  @Override
  public synchronized ProxyInfo<T> getProxy() {
    if (currentUsedHandler != null) {
      return currentUsedHandler;
    }
    Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
    StringBuilder combinedInfo = new StringBuilder("[");
    for (int i = 0; i < proxies.size(); i++) {
      ProxyInfo<T> pInfo = super.getProxy();
      incrementProxyIndex();
      targetProxyInfos.put(pInfo.proxyInfo, pInfo);
      combinedInfo.append(pInfo.proxyInfo).append(',');
    }
    combinedInfo.append(']');
    T wrappedProxy = (T) Proxy.newProxyInstance(
            RequestHedgingInvocationHandler.class.getClassLoader(),
            new Class<?>[]{xface},
            new RequestHedgingInvocationHandler(targetProxyInfos));
    currentUsedHandler =
        new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
    return currentUsedHandler;
  }

  @Override
  public synchronized void performFailover(T currentProxy) {
    toIgnore = ((RequestHedgingInvocationHandler) Proxy.getInvocationHandler(
        currentUsedHandler.proxy)).currentUsedProxy.proxyInfo;
    this.currentUsedHandler = null;
  }

  /**
   * Check the exception returned by the proxy log a warning message if it's
   * not a StandbyException (expected exception).
   * @param ex Exception to evaluate.
   * @param proxyInfo Information of the proxy reporting the exception.
   */
  private void logProxyException(Exception ex, String proxyInfo) {
    if (isStandbyException(ex)) {
      LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex);
    } else {
      LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex);
    }
  }

  /**
   * Check if the returned exception is caused by an standby namenode.
   * @param exception Exception to check.
   * @return If the exception is caused by an standby namenode.
   */
  private boolean isStandbyException(Exception exception) {
    if (exception instanceof RemoteException) {
      return ((RemoteException) exception).unwrapRemoteException()
          instanceof StandbyException;
    }
    return false;
  }

  /**
   * Unwraps the ExecutionException. <p>
   * Example:
   * <blockquote><pre>
   * if ex is
   * ExecutionException(InvocationTargetException(SomeException))
   * returns SomeException
   * </pre></blockquote>
   *
   * @return unwrapped exception
   */
  private Exception unwrapExecutionException(ExecutionException ex) {
    if (ex != null) {
      Throwable cause = ex.getCause();
      if (cause instanceof InvocationTargetException) {
        return
            unwrapInvocationTargetException((InvocationTargetException)cause);
      }
    }
    return ex;

  }

  /**
   * Unwraps the InvocationTargetException. <p>
   * Example:
   * <blockquote><pre>
   * if ex is InvocationTargetException(SomeException)
   * returns SomeException
   * </pre></blockquote>
   *
   * @return unwrapped exception
   */
  private Exception unwrapInvocationTargetException(
      InvocationTargetException ex) {
    if (ex != null) {
      Throwable cause = ex.getCause();
      if (cause instanceof Exception) {
        return (Exception) cause;
      }
    }
    return ex;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractNNFailoverProxyProvider 源码

hadoop ClientHAProxyFactory 源码

hadoop ConfiguredFailoverProxyProvider 源码

hadoop HAProxyFactory 源码

hadoop IPFailoverProxyProvider 源码

hadoop InMemoryAliasMapFailoverProxyProvider 源码

hadoop ObserverReadProxyProvider 源码

hadoop ObserverReadProxyProviderWithIPFailover 源码

hadoop ReadOnly 源码

hadoop WrappedFailoverProxyProvider 源码

0  赞