hadoop PerNodeTimelineCollectorsAuxService 源码

  • 2022-10-20
  • 浏览 (216)

haddop PerNodeTimelineCollectorsAuxService 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.collector;

import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerType;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The top-level server for the per-node timeline collector manager. Currently
 * it is defined as an auxiliary service to accommodate running within another
 * daemon (e.g. node manager).
 */
@Private
@Unstable
public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
  private static final Logger LOG =
      LoggerFactory.getLogger(PerNodeTimelineCollectorsAuxService.class);
  private static final int SHUTDOWN_HOOK_PRIORITY = 30;

  private final NodeTimelineCollectorManager collectorManager;
  private long collectorLingerPeriod;
  private ScheduledExecutorService scheduler;
  private Map<ApplicationId, Set<ContainerId>> appIdToContainerId =
      new ConcurrentHashMap<>();

  public PerNodeTimelineCollectorsAuxService() {
    this(new NodeTimelineCollectorManager(true));
  }

  @VisibleForTesting PerNodeTimelineCollectorsAuxService(
      NodeTimelineCollectorManager collectorsManager) {
    super("timeline_collector");
    this.collectorManager = collectorsManager;
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
      throw new YarnException(
          "Looks like timeline_collector is set as an auxillary service in "
              + YarnConfiguration.NM_AUX_SERVICES
              + ". But Timeline service v2 is not enabled,"
              + " so timeline_collector needs to be removed"
              + " from that list of auxillary services.");
    }
    collectorLingerPeriod =
        conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
            YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS);
    scheduler = Executors.newSingleThreadScheduledExecutor();
    collectorManager.init(conf);
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    collectorManager.start();
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    scheduler.shutdown();
    if (!scheduler.awaitTermination(collectorLingerPeriod,
        TimeUnit.MILLISECONDS)) {
      LOG.warn(
          "Scheduler terminated before removing the application collectors");
    }
    collectorManager.stop();
    super.serviceStop();
  }

  // these methods can be used as the basis for future service methods if the
  // per-node collector runs separate from the node manager
  /**
   * Creates and adds an app level collector for the specified application id.
   * The collector is also initialized and started. If the service already
   * exists, no new service is created.
   *
   * @param appId Application Id to be added.
   * @param user Application Master container user.
   * @return whether it was added successfully
   */
  public boolean addApplicationIfAbsent(ApplicationId appId, String user) {
    AppLevelTimelineCollector collector =
        new AppLevelTimelineCollectorWithAgg(appId, user);
    return (collectorManager.putIfAbsent(appId, collector)
        == collector);
  }

  /**
   * Removes the app level collector for the specified application id. The
   * collector is also stopped as a result. If the collector does not exist, no
   * change is made.
   *
   * @param appId Application Id to be removed.
   * @return whether it was removed successfully
   */
  public boolean removeApplication(ApplicationId appId) {
    return collectorManager.remove(appId);
  }

  /**
   * Creates and adds an app level collector for the specified application id.
   * The collector is also initialized and started. If the collector already
   * exists, no new collector is created.
   */
  @Override
  public void initializeContainer(ContainerInitializationContext context) {
    // intercept the event of the AM container being created and initialize the
    // app level collector service
    if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
      ApplicationId appId = context.getContainerId().
          getApplicationAttemptId().getApplicationId();
      synchronized (appIdToContainerId){
        Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
        if (masterContainers == null) {
          masterContainers = new HashSet<>();
          appIdToContainerId.put(appId, masterContainers);
        }
        masterContainers.add(context.getContainerId());
      }
      addApplicationIfAbsent(appId, context.getUser());
    }
  }

  /**
   * Removes the app level collector for the specified application id. The
   * collector is also stopped as a result. If the collector does not exist, no
   * change is made.
   */
  @Override
  public void stopContainer(ContainerTerminationContext context) {
    // intercept the event of the AM container being stopped and remove the app
    // level collector service
    if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
      final ContainerId containerId = context.getContainerId();
      removeApplicationCollector(containerId);
    }
  }

  @VisibleForTesting
  protected Future removeApplicationCollector(final ContainerId containerId) {
    final ApplicationId appId =
        containerId.getApplicationAttemptId().getApplicationId();
    return scheduler.schedule(new Runnable() {
      public void run() {
        boolean shouldRemoveApplication = false;
        synchronized (appIdToContainerId) {
          Set<ContainerId> masterContainers = appIdToContainerId.get(appId);
          if (masterContainers == null) {
            LOG.info("Stop container for " + containerId
                + " is called before initializing container.");
            return;
          }
          masterContainers.remove(containerId);
          if (masterContainers.size() == 0) {
            // remove only if it is last master container
            shouldRemoveApplication = true;
            appIdToContainerId.remove(appId);
          }
        }

        if (shouldRemoveApplication) {
          removeApplication(appId);
        }
      }
    }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
  }

  @VisibleForTesting
  boolean hasApplication(ApplicationId appId) {
    return collectorManager.containsTimelineCollector(appId);
  }

  @Override
  public void initializeApplication(ApplicationInitializationContext context) {
  }

  @Override
  public void stopApplication(ApplicationTerminationContext context) {
  }

  @Override
  public ByteBuffer getMetaData() {
    // TODO currently it is not used; we can return a more meaningful data when
    // we connect it with an AM
    return ByteBuffer.allocate(0);
  }

  @VisibleForTesting
  public static PerNodeTimelineCollectorsAuxService
      launchServer(String[] args, NodeTimelineCollectorManager collectorManager,
      Configuration conf) {
    Thread
      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    StringUtils.startupShutdownMessage(
        PerNodeTimelineCollectorsAuxService.class, args, LOG);
    PerNodeTimelineCollectorsAuxService auxService = null;
    try {
      auxService = collectorManager == null ?
          new PerNodeTimelineCollectorsAuxService(
              new NodeTimelineCollectorManager(false)) :
          new PerNodeTimelineCollectorsAuxService(collectorManager);
      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
          SHUTDOWN_HOOK_PRIORITY);
      auxService.init(conf);
      auxService.start();
    } catch (Throwable t) {
      LOG.error("Error starting PerNodeTimelineCollectorServer", t);
      ExitUtil.terminate(-1, "Error starting PerNodeTimelineCollectorServer");
    }
    return auxService;
  }

  private static class ShutdownHook implements Runnable {
    private final PerNodeTimelineCollectorsAuxService auxService;

    public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) {
      this.auxService = auxService;
    }

    public void run() {
      auxService.stop();
    }
  }

  public static void main(String[] args) {
    Configuration conf = new YarnConfiguration();
    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
    launchServer(args, null, conf);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AppLevelTimelineCollector 源码

hadoop AppLevelTimelineCollectorWithAgg 源码

hadoop NodeTimelineCollectorManager 源码

hadoop TimelineCollector 源码

hadoop TimelineCollectorContext 源码

hadoop TimelineCollectorManager 源码

hadoop TimelineCollectorWebService 源码

hadoop package-info 源码

0  赞