hadoop WasbRemoteCallHelper 源码

  • 2022-10-20
  • 浏览 (185)

haddop WasbRemoteCallHelper 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;

/**
 * Helper class the has constants and helper methods
 * used in WASB when integrating with a remote http cred
 * service. Currently, remote service will be used to generate
 * SAS keys.
 */
public class WasbRemoteCallHelper {

  public static final Logger LOG =
      LoggerFactory.getLogger(WasbRemoteCallHelper.class);
  /**
   * Return code when the remote call is successful. {@value}
   */
  public static final int REMOTE_CALL_SUCCESS_CODE = 0;

  /**
   * Application Json content type.
   */
  private static final String APPLICATION_JSON = "application/json";

  /**
   * Max content length of the response.
   */
  private static final int MAX_CONTENT_LENGTH = 1024;

  /**
   * Client instance to be used for making the remote call.
   */
  private HttpClient client = null;

  private Random random = new Random();

  private RetryPolicy retryPolicy = null;

  public WasbRemoteCallHelper(RetryPolicy retryPolicy) {
    this.client = HttpClientBuilder.create().build();
    this.retryPolicy = retryPolicy;
  }

  @VisibleForTesting public void updateHttpClient(HttpClient client) {
    this.client = client;
  }

  /**
   * Helper method to make remote HTTP Get request.
   *
   * @param urls        - Service urls to be used, if one fails try another.
   * @param path        - URL endpoint for the resource.
   * @param queryParams - list of query parameters
   * @param httpMethod  - http Method to be used.
   * @return Http Response body returned as a string. The caller
   * is expected to semantically understand the response.
   * @throws IOException when there an error in executing the remote http request.
   */
  public String makeRemoteRequest(String[] urls, String path,
      List<NameValuePair> queryParams, String httpMethod) throws IOException {

    return retryableRequest(urls, path, queryParams, httpMethod);
  }

  protected String retryableRequest(String[] urls, String path,
      List<NameValuePair> queryParams, String httpMethod) throws IOException {
    HttpResponse response = null;
    HttpUriRequest httpRequest = null;

    /**
     * Get the index of local url if any. If list of urls contains strings like
     * "https://localhost:" or "http://localhost", consider it as local url and
     * give it affinity more than other urls in the list.
     */

    int indexOfLocalUrl = -1;
    for (int i = 0; i < urls.length; i++) {
      if (urls[i].toLowerCase().startsWith("https://localhost:") || urls[i]
          .toLowerCase().startsWith("http://localhost:")) {
        indexOfLocalUrl = i;
      }
    }

    boolean requiresNewAuth = false;
    for (int retry = 0, index = (indexOfLocalUrl != -1)
                                ? indexOfLocalUrl
                                : random
                                    .nextInt(urls.length);; retry++, index++) {
      if (index >= urls.length) {
        index = index % urls.length;
      }
      /**
       * If the first request fails to localhost, then randomly pick the next url
       * from the remaining urls in the list, so that load can be balanced.
       */
      if (indexOfLocalUrl != -1 && retry == 1) {
        index = (index + random.nextInt(urls.length)) % urls.length;
        if (index == indexOfLocalUrl) {
          index = (index + 1) % urls.length;
        }
      }
      try {
        httpRequest =
            getHttpRequest(urls, path, queryParams, index, httpMethod,
                requiresNewAuth);
        httpRequest.setHeader("Accept", APPLICATION_JSON);
        response = client.execute(httpRequest);
        StatusLine statusLine = response.getStatusLine();
        if (statusLine == null
            || statusLine.getStatusCode() != HttpStatus.SC_OK) {
          requiresNewAuth =
              (statusLine == null)
                  || (statusLine.getStatusCode() == HttpStatus.SC_UNAUTHORIZED);

          throw new WasbRemoteCallException(
              httpRequest.getURI().toString() + ":" + ((statusLine != null)
                                                       ? statusLine.toString()
                                                       : "NULL"));
        } else {
          requiresNewAuth = false;
        }

        Header contentTypeHeader = response.getFirstHeader("Content-Type");
        if (contentTypeHeader == null || !APPLICATION_JSON
            .equals(contentTypeHeader.getValue())) {
          throw new WasbRemoteCallException(
              httpRequest.getURI().toString() + ":"
                  + "Content-Type mismatch: expected: " + APPLICATION_JSON
                  + ", got " + ((contentTypeHeader != null) ? contentTypeHeader
                  .getValue() : "NULL"));
        }

        Header contentLengthHeader = response.getFirstHeader("Content-Length");
        if (contentLengthHeader == null) {
          throw new WasbRemoteCallException(
              httpRequest.getURI().toString() + ":"
                  + "Content-Length header missing");
        }

        try {
          if (Integer.parseInt(contentLengthHeader.getValue())
              > MAX_CONTENT_LENGTH) {
            throw new WasbRemoteCallException(
                httpRequest.getURI().toString() + ":" + "Content-Length:"
                    + contentLengthHeader.getValue() + "exceeded max:"
                    + MAX_CONTENT_LENGTH);
          }
        } catch (NumberFormatException nfe) {
          throw new WasbRemoteCallException(
              httpRequest.getURI().toString() + ":"
                  + "Invalid Content-Length value :" + contentLengthHeader
                  .getValue());
        }

        BufferedReader rd = null;
        StringBuilder responseBody = new StringBuilder();
        try {
          rd = new BufferedReader(
              new InputStreamReader(response.getEntity().getContent(),
                  StandardCharsets.UTF_8));
          String responseLine = "";
          while ((responseLine = rd.readLine()) != null) {
            responseBody.append(responseLine);
          }
        } finally {
          rd.close();
        }
        return responseBody.toString();
      } catch (URISyntaxException uriSyntaxEx) {
        throw new WasbRemoteCallException("Encountered URISyntaxException "
            + "while building the HttpGetRequest to remote service",
            uriSyntaxEx);
      } catch (IOException e) {
        LOG.debug(e.getMessage(), e);
        try {
          shouldRetry(e, retry, (httpRequest != null)
                                ? httpRequest.getURI().toString()
                                : urls[index]);
        } catch (IOException ioex) {
          String message =
              "Encountered error while making remote call to " + String
                  .join(",", urls) + " retried " + retry + " time(s).";
          LOG.error(message, ioex);
          throw new WasbRemoteCallException(message, ioex);
        }
      }
    }
  }

  protected HttpUriRequest getHttpRequest(String[] urls, String path,
      List<NameValuePair> queryParams, int urlIndex, String httpMethod,
      boolean requiresNewAuth) throws URISyntaxException, IOException {
    URIBuilder uriBuilder = null;
    uriBuilder =
        new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
    if (uriBuilder.getHost().equals("localhost")) {
      uriBuilder.setHost(InetAddress.getLocalHost().getCanonicalHostName());
    }
    HttpUriRequest httpUriRequest = null;
    switch (httpMethod) {
    case HttpPut.METHOD_NAME:
      httpUriRequest = new HttpPut(uriBuilder.build());
      break;
    case HttpPost.METHOD_NAME:
      httpUriRequest = new HttpPost(uriBuilder.build());
      break;
    default:
      httpUriRequest = new HttpGet(uriBuilder.build());
      break;
    }
    return httpUriRequest;
  }

  private void shouldRetry(final IOException ioe, final int retry,
      final String url) throws IOException {
    CharSequence authenticationExceptionMessage =
        Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE;
    if (ioe instanceof WasbRemoteCallException && ioe.getMessage()
        .equals(authenticationExceptionMessage)) {
      throw ioe;
    }
    try {
      final RetryPolicy.RetryAction a = (retryPolicy != null)
                                        ? retryPolicy
                                            .shouldRetry(ioe, retry, 0, true)
                                        : RetryPolicy.RetryAction.FAIL;

      boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
      boolean isFailoverAndRetry =
          a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;

      if (isRetry || isFailoverAndRetry) {
        LOG.debug("Retrying connect to Remote service:{}. Already tried {}"
                + " time(s); retry policy is {}, " + "delay {}ms.", url, retry,
            retryPolicy, a.delayMillis);

        Thread.sleep(a.delayMillis);
        return;
      }
    } catch (InterruptedIOException e) {
      LOG.warn(e.getMessage(), e);
      Thread.currentThread().interrupt();
      return;
    } catch (Exception e) {
      LOG.warn("Original exception is ", ioe);
      throw new WasbRemoteCallException(e.getMessage(), e);
    }
    LOG.debug("Not retrying anymore, already retried the urls {} time(s)",
        retry);
    throw new WasbRemoteCallException(
        url + ":" + "Encountered IOException while making remote call", ioe);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AzureException 源码

hadoop AzureFileSystemThreadPoolExecutor 源码

hadoop AzureFileSystemThreadTask 源码

hadoop AzureLinkedStack 源码

hadoop AzureNativeFileSystemStore 源码

hadoop BlobMaterialization 源码

hadoop BlobOperationDescriptor 源码

hadoop BlockBlobAppendStream 源码

hadoop BlockBlobInputStream 源码

hadoop CachingAuthorizer 源码

0  赞