hadoop SelfThrottlingIntercept 源码

  • 2022-10-20
  • 浏览 (240)

haddop SelfThrottlingIntercept 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.net.HttpURLConnection;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;

import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RequestResult;
import com.microsoft.azure.storage.ResponseReceivedEvent;
import com.microsoft.azure.storage.SendingRequestEvent;
import com.microsoft.azure.storage.StorageEvent;

/*
 * Self throttling is implemented by hooking into send & response callbacks 
 * One instance of this class is created per operationContext so each blobUpload/blobDownload/etc.
 * 
 * Self throttling only applies to 2nd and subsequent packets of an operation.  This is a simple way to 
 * ensure it only affects bulk transfers and not every tiny request.
 * 
 * A blobDownload will involve sequential packet transmissions and so there are no concurrency concerns
 * A blobUpload will generally involve concurrent upload worker threads that share one operationContext and one throttling instance.
 *   -- we do not track the latencies for each worker thread as they are doing similar work and will rarely collide in practice.  
 *   -- concurrent access to lastE2Edelay must be protected.  
 *       -- volatile is necessary and should be sufficient to protect simple access to primitive values (java 1.5 onwards) 
 *       -- synchronized{} blocks are also used to be conservative and for easier maintenance.
 *   
 * If an operation were to perform concurrent GETs and PUTs there is the possibility of getting confused regarding
 * whether lastE2Edelay was a read or write measurement.  This scenario does not occur.
 *
 * readFactor  = target read throughput as factor of unrestricted throughput.
 * writeFactor = target write throughput as factor of unrestricted throughput.
 * 
 * As we introduce delays it is important to only measure the actual E2E latency and not the augmented latency
 * To achieve this, we fiddle the 'startDate' of the transfer tracking object.
 */


/**
 * 
 * Introduces delays in our Azure traffic to prevent overrunning the server-side throttling limits.
 *
 */
@InterfaceAudience.Private
public class SelfThrottlingIntercept {
  public static final Log LOG = LogFactory
      .getLog(SelfThrottlingIntercept.class);

  private final float readFactor;
  private final float writeFactor;
  private final OperationContext operationContext;

  // Concurrency: access to non-final members must be thread-safe
  private long lastE2Elatency;

  public SelfThrottlingIntercept(OperationContext operationContext, 
      float readFactor, float writeFactor) {
    this.operationContext = operationContext;
    this.readFactor = readFactor;
    this.writeFactor = writeFactor;
  }

  public static void hook(OperationContext operationContext, float readFactor,
      float writeFactor) {

    SelfThrottlingIntercept throttler = new SelfThrottlingIntercept(
        operationContext, readFactor, writeFactor);
    ResponseReceivedListener responseListener = throttler.new ResponseReceivedListener();
    SendingRequestListener sendingListener = throttler.new SendingRequestListener();

    operationContext.getResponseReceivedEventHandler().addListener(
        responseListener);
    operationContext.getSendingRequestEventHandler().addListener(
        sendingListener);
  }

  public void responseReceived(ResponseReceivedEvent event) {
    RequestResult result = event.getRequestResult();
    Date startDate = result.getStartDate();
    Date stopDate = result.getStopDate();
    long elapsed = stopDate.getTime() - startDate.getTime();

    synchronized (this) {
      this.lastE2Elatency = elapsed;
    }

    if (LOG.isDebugEnabled()) {
      int statusCode = result.getStatusCode();
      String etag = result.getEtag();
      HttpURLConnection urlConnection = (HttpURLConnection) event
          .getConnectionObject();
      int contentLength = urlConnection.getContentLength();
      String requestMethod = urlConnection.getRequestMethod();
      long threadId = Thread.currentThread().getId();
      LOG.debug(String
          .format(
              "SelfThrottlingIntercept:: ResponseReceived: threadId=%d, Status=%d, Elapsed(ms)=%d, ETAG=%s, contentLength=%d, requestMethod=%s",
              threadId, statusCode, elapsed, etag, contentLength, requestMethod));
    }
  }

  public void sendingRequest(SendingRequestEvent sendEvent) {
    long lastLatency;
    boolean operationIsRead; // for logging
    synchronized (this) {

      lastLatency = this.lastE2Elatency;
    }

    float sleepMultiple;
    HttpURLConnection urlConnection = (HttpURLConnection) sendEvent
        .getConnectionObject();

    // Azure REST API never uses POST, so PUT is a sufficient test for an
    // upload.
    if (urlConnection.getRequestMethod().equalsIgnoreCase("PUT")) {
      operationIsRead = false;
      sleepMultiple = (1 / writeFactor) - 1;
    } else {
      operationIsRead = true;
      sleepMultiple = (1 / readFactor) - 1;
    }

    long sleepDuration = (long) (sleepMultiple * lastLatency);
    if (sleepDuration < 0) {
      sleepDuration = 0;
    }

    if (sleepDuration > 0) {
      try {
        // Thread.sleep() is not exact but it seems sufficiently accurate for
        // our needs. If needed this could become a loop of small waits that
        // tracks actual
        // elapsed time.
        Thread.sleep(sleepDuration);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
      }

      // reset to avoid counting the sleep against request latency
      sendEvent.getRequestResult().setStartDate(new Date());
    }

    if (LOG.isDebugEnabled()) {
      boolean isFirstRequest = (lastLatency == 0);
      long threadId = Thread.currentThread().getId();
      LOG.debug(String
          .format(
              " SelfThrottlingIntercept:: SendingRequest:   threadId=%d, requestType=%s, isFirstRequest=%b, sleepDuration=%d",
              threadId, operationIsRead ? "read " : "write", isFirstRequest,
              sleepDuration));
    }
  }

  // simply forwards back to the main class.
  // this is necessary as our main class cannot implement two base-classes.
  @InterfaceAudience.Private
  class SendingRequestListener extends StorageEvent<SendingRequestEvent> {

    @Override
    public void eventOccurred(SendingRequestEvent event) {
      sendingRequest(event);
    }
  }

  // simply forwards back to the main class.
  // this is necessary as our main class cannot implement two base-classes.
  @InterfaceAudience.Private
  class ResponseReceivedListener extends StorageEvent<ResponseReceivedEvent> {

    @Override
    public void eventOccurred(ResponseReceivedEvent event) {
      responseReceived(event);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AzureException 源码

hadoop AzureFileSystemThreadPoolExecutor 源码

hadoop AzureFileSystemThreadTask 源码

hadoop AzureLinkedStack 源码

hadoop AzureNativeFileSystemStore 源码

hadoop BlobMaterialization 源码

hadoop BlobOperationDescriptor 源码

hadoop BlockBlobAppendStream 源码

hadoop BlockBlobInputStream 源码

hadoop CachingAuthorizer 源码

0  赞