hadoop LogAggregationService 源码

  • 2022-10-20
  • 浏览 (190)

haddop LogAggregationService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.security.token.SecretManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;


import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class LogAggregationService extends AbstractService implements
    LogHandler {

  private static final Logger LOG =
       LoggerFactory.getLogger(LogAggregationService.class);
  // This configuration is for debug and test purpose. By setting
  // this configuration as true. We can break the lower bound of
  // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
  private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
      = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
  private long rollingMonitorInterval;

  private final Context context;
  private final DeletionService deletionService;
  private final Dispatcher dispatcher;

  private LocalDirsHandlerService dirsHandler;
  private NodeId nodeId;

  private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;

  // Holds applications whose aggregation is disable due to invalid Token
  private final Set<ApplicationId> invalidTokenApps;

  @VisibleForTesting
  ExecutorService threadPool;
  
  public LogAggregationService(Dispatcher dispatcher, Context context,
      DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
    super(LogAggregationService.class.getName());
    this.dispatcher = dispatcher;
    this.context = context;
    this.deletionService = deletionService;
    this.dirsHandler = dirsHandler;
    this.appLogAggregators =
        new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
    this.invalidTokenApps = ConcurrentHashMap.newKeySet();
  }

  private static long calculateRollingMonitorInterval(Configuration conf) {
    long interval = conf.getLong(
        YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
        YarnConfiguration.
            DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);

    if (interval <= 0) {
      LOG.info("rollingMonitorInterval is set as " + interval
          + ". The log rolling monitoring interval is disabled. "
          + "The logs will be aggregated after this application is finished.");
    } else {
      boolean logAggregationDebugMode =
          conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
      long minRollingMonitorInterval = conf.getLong(
          YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
          YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);

      boolean warnHardMinLimitLowerThanDefault = minRollingMonitorInterval <
          YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT &&
          !logAggregationDebugMode;
      if (warnHardMinLimitLowerThanDefault) {
        LOG.warn("{} has been set to {}, which is less than the default "
            + "minimum value {}. This may impact NodeManager's performance.",
            YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
            minRollingMonitorInterval,
            YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);
      }
      boolean lowerThanHardLimit = interval < minRollingMonitorInterval;
      if (lowerThanHardLimit) {
        if (logAggregationDebugMode) {
          LOG.info("Log aggregation debug mode enabled. " +
              "Skipped checking minimum limit.");
        } else {
          LOG.warn("rollingMonitorInterval should be more than " +
              "or equal to {} seconds. Using {} seconds instead.",
              minRollingMonitorInterval, minRollingMonitorInterval);
          interval = minRollingMonitorInterval;
        }
      }
    }
    return interval;
  }

  protected void serviceInit(Configuration conf) throws Exception {
    int threadPoolSize = getAggregatorThreadPoolSize(conf);
    this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
        new ThreadFactoryBuilder()
            .setNameFormat("LogAggregationService #%d")
            .build());

    rollingMonitorInterval = calculateRollingMonitorInterval(conf);
    LOG.info("rollingMonitorInterval is set as {}. The logs will be " +
        "aggregated every {} seconds", rollingMonitorInterval,
        rollingMonitorInterval);

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    // NodeId is only available during start, the following cannot be moved
    // anywhere else.
    this.nodeId = this.context.getNodeId();
    super.serviceStart();
  }
  
  @Override
  protected void serviceStop() throws Exception {
    LOG.info(this.getName() + " waiting for pending aggregation during exit");
    stopAggregators();
    super.serviceStop();
  }
   
  private void stopAggregators() {
    threadPool.shutdown();
    boolean supervised = getConfig().getBoolean(
        YarnConfiguration.NM_RECOVERY_SUPERVISED,
        YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
    // if recovery on restart is supported then leave outstanding aggregations
    // to the next restart
    boolean shouldAbort = context.getNMStateStore().canRecover()
        && !context.getDecommissioned() && supervised;
    // politely ask to finish
    for (AppLogAggregator aggregator : appLogAggregators.values()) {
      if (shouldAbort) {
        aggregator.abortLogAggregation();
      } else {
        aggregator.finishLogAggregation();
      }
    }
    while (!threadPool.isTerminated()) { // wait for all threads to finish
      for (ApplicationId appId : appLogAggregators.keySet()) {
        LOG.info("Waiting for aggregation to complete for " + appId);
      }
      try {
        if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
          threadPool.shutdownNow(); // send interrupt to hurry them along
        }
      } catch (InterruptedException e) {
        LOG.warn("Aggregation stop interrupted!");
        break;
      }
    }
    for (ApplicationId appId : appLogAggregators.keySet()) {
      LOG.warn("Some logs may not have been aggregated for " + appId);
    }
  }

  @SuppressWarnings("unchecked")
  private void initApp(final ApplicationId appId, String user,
      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
      LogAggregationContext logAggregationContext,
      long recoveredLogInitedTime) {
    ApplicationEvent eventResponse;
    try {
      initAppAggregator(appId, user, credentials, appAcls,
          logAggregationContext, recoveredLogInitedTime);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
    } catch (YarnRuntimeException e) {
      LOG.warn("Application failed to init aggregation", e);
      eventResponse = new ApplicationEvent(appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
    }
    this.dispatcher.getEventHandler().handle(eventResponse);
  }
  
  FileContext getLocalFileContext(Configuration conf) {
    try {
      return FileContext.getLocalFSFileContext(conf);
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to access local fs");
    }
  }

  protected void initAppAggregator(final ApplicationId appId, String user,
      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
      LogAggregationContext logAggregationContext,
      long recoveredLogInitedTime) {

    // Get user's FileSystem credentials
    final UserGroupInformation userUgi =
        UserGroupInformation.createRemoteUser(user);
    if (credentials != null) {
      userUgi.addCredentials(credentials);
    }

    LogAggregationFileController logAggregationFileController =
        getLogAggregationFileController(getConfig());
    logAggregationFileController.verifyAndCreateRemoteLogDir();
    // New application
    final AppLogAggregator appLogAggregator =
        new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
            getConfig(), appId, userUgi, this.nodeId, dirsHandler,
            logAggregationFileController.getRemoteNodeLogFileForApp(appId,
            user, nodeId), appAcls, logAggregationContext, this.context,
            getLocalFileContext(getConfig()), this.rollingMonitorInterval,
            recoveredLogInitedTime, logAggregationFileController);
    if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
      throw new YarnRuntimeException("Duplicate initApp for " + appId);
    }
    // wait until check for existing aggregator to create dirs
    YarnRuntimeException appDirException = null;
    try {
      // Create the app dir
      logAggregationFileController.createAppDir(user, appId, userUgi);
    } catch (Exception e) {
      appLogAggregator.disableLogAggregation();

      // add to disabled aggregators if due to InvalidToken
      if (e.getCause() instanceof SecretManager.InvalidToken) {
        invalidTokenApps.add(appId);
      }
      if (!(e instanceof YarnRuntimeException)) {
        appDirException = new YarnRuntimeException(e);
      } else {
        appDirException = (YarnRuntimeException)e;
      }
    }

    // TODO Get the user configuration for the list of containers that need log
    // aggregation.

    // Schedule the aggregator.
    Runnable aggregatorWrapper = new Runnable() {
      public void run() {
        try {
          appLogAggregator.run();
        } finally {
          appLogAggregators.remove(appId);
          closeFileSystems(userUgi);
        }
      }
    };
    this.threadPool.execute(aggregatorWrapper);

    if (appDirException != null) {
      throw appDirException;
    }
  }

  protected void closeFileSystems(final UserGroupInformation userUgi) {
    try {
      FileSystem.closeAllForUGI(userUgi);
    } catch (IOException e) {
      LOG.warn("Failed to close filesystems: ", e);
    }
  }

  // for testing only
  @Private
  int getNumAggregators() {
    return this.appLogAggregators.size();
  }

  private void stopContainer(ContainerId containerId,
      ContainerType containerType, int exitCode) {

    // A container is complete. Put this containers' logs up for aggregation if
    // this containers' logs are needed.
    AppLogAggregator aggregator = this.appLogAggregators.get(
        containerId.getApplicationAttemptId().getApplicationId());
    if (aggregator == null) {
      LOG.warn("Log aggregation is not initialized for " + containerId
          + ", did it fail to start?");
      return;
    }
    aggregator.startContainerLogAggregation(
        new ContainerLogContext(containerId, containerType, exitCode));
  }

  @SuppressWarnings("unchecked")
  private void stopApp(ApplicationId appId) {

    // App is complete. Finish up any containers' pending log aggregation and
    // close the application specific logFile.
    try {
      AppLogAggregator aggregator = this.appLogAggregators.get(appId);
      if (aggregator == null) {
        LOG.warn("Log aggregation is not initialized for " + appId
            + ", did it fail to start?");
        this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId,
            ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
        return;
      }
      aggregator.finishLogAggregation();
    } finally {
      // Remove invalid Token Apps
      invalidTokenApps.remove(appId);
    }
  }

  @Override
  public void handle(LogHandlerEvent event) {
    switch (event.getType()) {
      case APPLICATION_STARTED:
        LogHandlerAppStartedEvent appStartEvent =
            (LogHandlerAppStartedEvent) event;
        initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
            appStartEvent.getCredentials(),
            appStartEvent.getApplicationAcls(),
            appStartEvent.getLogAggregationContext(),
            appStartEvent.getRecoveredAppLogInitedTime());
        break;
      case CONTAINER_FINISHED:
        LogHandlerContainerFinishedEvent containerFinishEvent =
            (LogHandlerContainerFinishedEvent) event;
        stopContainer(containerFinishEvent.getContainerId(),
            containerFinishEvent.getContainerType(),
            containerFinishEvent.getExitCode());
        break;
      case APPLICATION_FINISHED:
        LogHandlerAppFinishedEvent appFinishedEvent =
            (LogHandlerAppFinishedEvent) event;
        stopApp(appFinishedEvent.getApplicationId());
        break;
      case LOG_AGG_TOKEN_UPDATE:
        checkAndEnableAppAggregators();
        break;
      default:
        ; // Ignore
    }

  }

  private void checkAndEnableAppAggregators() {
    for (ApplicationId appId : invalidTokenApps) {
      try {
        AppLogAggregator aggregator = appLogAggregators.get(appId);
        if (aggregator != null) {
          Credentials credentials =
              context.getSystemCredentialsForApps().get(appId);
          if (credentials != null) {
            // Create the app dir again with
            LogAggregationFileController logAggregationFileController =
                getLogAggregationFileController(getConfig());
            UserGroupInformation userUgi =
                aggregator.updateCredentials(credentials);
            logAggregationFileController
                .createAppDir(userUgi.getShortUserName(), appId, userUgi);
            aggregator.enableLogAggregation();
          }
          invalidTokenApps.remove(appId);
          LOG.info("LogAggregation enabled for application {}", appId);
        }
      } catch (Exception e) {
        //Ignore exception
        LOG.warn("Enable aggregators failed {}", appId);
      }
    }
  }

  @Override
  public Set<ApplicationId> getInvalidTokenApps() {
    return invalidTokenApps;
  }

  @VisibleForTesting
  public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
    return this.appLogAggregators;
  }

  @VisibleForTesting
  public NodeId getNodeId() {
    return this.nodeId;
  }

  @VisibleForTesting
  public long getRollingMonitorInterval() {
    return rollingMonitorInterval;
  }

  private int getAggregatorThreadPoolSize(Configuration conf) {
    int threadPoolSize;
    try {
      threadPoolSize = conf.getInt(YarnConfiguration
          .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
          YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
    } catch (NumberFormatException ex) {
      LOG.warn("Invalid thread pool size. Setting it to the default value " +
          "in YarnConfiguration");
      threadPoolSize = YarnConfiguration.
          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
    }
    if(threadPoolSize <= 0) {
      LOG.warn("Invalid thread pool size. Setting it to the default value " +
          "in YarnConfiguration");
      threadPoolSize = YarnConfiguration.
          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
    }
    return threadPoolSize;
  }

  @VisibleForTesting
  public LogAggregationFileController getLogAggregationFileController(
      Configuration conf) {
    LogAggregationFileControllerFactory factory
        = new LogAggregationFileControllerFactory(conf);
    LogAggregationFileController logAggregationFileController = factory
        .getFileControllerForWrite();
    return logAggregationFileController;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AMOnlyLogAggregationPolicy 源码

hadoop AMOrFailedContainerLogAggregationPolicy 源码

hadoop AbstractContainerLogAggregationPolicy 源码

hadoop AllContainerLogAggregationPolicy 源码

hadoop AppLogAggregator 源码

hadoop AppLogAggregatorImpl 源码

hadoop FailedContainerLogAggregationPolicy 源码

hadoop FailedOrKilledContainerLogAggregationPolicy 源码

hadoop LimitSizeContainerLogAggregationPolicy 源码

hadoop NoneContainerLogAggregationPolicy 源码

0  赞