hadoop TimelineCollectorManager 源码

  • 2022-10-20
  • 浏览 (221)

haddop TimelineCollectorManager 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.collector;


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Class that manages adding and removing collectors and their lifecycle. It
 * provides thread safety access to the collectors inside.
 *
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TimelineCollectorManager extends CompositeService {
  private static final Logger LOG =
      LoggerFactory.getLogger(TimelineCollectorManager.class);

  private TimelineWriter writer;
  private ScheduledExecutorService writerFlusher;
  private int flushInterval;
  private boolean writerFlusherRunning;

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    writer = createTimelineWriter(conf);
    writer.init(conf);
    // create a single dedicated thread for flushing the writer on a periodic
    // basis
    writerFlusher = Executors.newSingleThreadScheduledExecutor();
    flushInterval = conf.getInt(
        YarnConfiguration.
        TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
        YarnConfiguration.
        DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
    super.serviceInit(conf);
  }

  private TimelineWriter createTimelineWriter(final Configuration conf) {
    String timelineWriterClassName = conf.get(
        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_CLASS);
    LOG.info("Using TimelineWriter: " + timelineWriterClassName);
    try {
      Class<?> timelineWriterClazz = Class.forName(timelineWriterClassName);
      if (TimelineWriter.class.isAssignableFrom(timelineWriterClazz)) {
        return (TimelineWriter) ReflectionUtils.newInstance(
            timelineWriterClazz, conf);
      } else {
        throw new YarnRuntimeException("Class: " + timelineWriterClassName
            + " not instance of " + TimelineWriter.class.getCanonicalName());
      }
    } catch (ClassNotFoundException e) {
      throw new YarnRuntimeException("Could not instantiate TimelineWriter: "
          + timelineWriterClassName, e);
    }
  }

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    if (writer != null) {
      writer.start();
    }
    // schedule the flush task
    writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
        flushInterval, flushInterval, TimeUnit.SECONDS);
    writerFlusherRunning = true;
  }

  // access to this map is synchronized with the map itself
  private final Map<ApplicationId, TimelineCollector> collectors =
      Collections.synchronizedMap(
          new HashMap<ApplicationId, TimelineCollector>());

  public TimelineCollectorManager(String name) {
    super(name);
  }

  protected TimelineWriter getWriter() {
    return writer;
  }

  /**
   * Put the collector into the collection if an collector mapped by id does
   * not exist.
   *
   * @param appId Application Id for which collector needs to be put.
   * @param collector timeline collector to be put.
   * @throws YarnRuntimeException if there  was any exception in initializing
   *                              and starting the app level service
   * @return the collector associated with id after the potential put.
   */
  public TimelineCollector putIfAbsent(ApplicationId appId,
      TimelineCollector collector) {
    TimelineCollector collectorInTable = null;
    synchronized (collectors) {
      collectorInTable = collectors.get(appId);
      if (collectorInTable == null) {
        try {
          // initialize, start, and add it to the collection so it can be
          // cleaned up when the parent shuts down
          collector.init(getConfig());
          collector.setWriter(writer);
          collector.start();
          collectors.put(appId, collector);
          LOG.info("the collector for " + appId + " was added");
          collectorInTable = collector;
          postPut(appId, collectorInTable);
        } catch (Exception e) {
          throw new YarnRuntimeException(e);
        }
      } else {
        LOG.info("the collector for " + appId + " already exists!");
      }
    }
    return collectorInTable;
  }

  /**
   * Callback handler for the timeline collector manager when a collector has
   * been added into the collector map.
   * @param appId Application id of the collector.
   * @param collector The actual timeline collector that has been added.
   */
  public void postPut(ApplicationId appId, TimelineCollector collector) {
    doPostPut(appId, collector);
    collector.setReadyToAggregate();
  }

  /**
   * A template method that will be called by
   * {@link  #postPut(ApplicationId, TimelineCollector)}.
   * @param appId Application id of the collector.
   * @param collector The actual timeline collector that has been added.
   */
  protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
  }

  /**
   * Removes the collector for the specified id. The collector is also stopped
   * as a result. If the collector does not exist, no change is made.
   *
   * @param appId Application Id to remove.
   * @return whether it was removed successfully
   */
  public boolean remove(ApplicationId appId) {
    TimelineCollector collector = collectors.remove(appId);
    if (collector == null) {
      LOG.error("the collector for " + appId + " does not exist!");
    } else {
      synchronized (collector) {
        postRemove(appId, collector);
        // stop the service to do clean up
        collector.stop();
      }
      LOG.info("The collector service for " + appId + " was removed");
    }
    return collector != null;
  }

  protected void postRemove(ApplicationId appId, TimelineCollector collector) {

  }

  /**
   * Returns the collector for the specified id.
   *
   * @param appId Application Id for which we need to get the collector.
   * @return the collector or null if it does not exist
   */
  public TimelineCollector get(ApplicationId appId) {
    return collectors.get(appId);
  }

  /**
   * Returns whether the collector for the specified id exists in this
   * collection.
   * @param appId Application Id.
   * @return true if collector for the app id is found, false otherwise.
   */
  public boolean containsTimelineCollector(ApplicationId appId) {
    return collectors.containsKey(appId);
  }

  @Override
  protected void serviceStop() throws Exception {
    if (collectors != null && collectors.size() > 0) {
      synchronized (collectors) {
        for (TimelineCollector c : collectors.values()) {
          c.serviceStop();
        }
      }
    }
    // stop the flusher first
    if (writerFlusher != null) {
      writerFlusher.shutdown();
      writerFlusherRunning = false;
      if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
        // in reality it should be ample time for the flusher task to finish
        // even if it times out, writers may be able to handle closing in this
        // situation fine
        // proceed to close the writer
        LOG.warn("failed to stop the flusher task in time. " +
            "will still proceed to close the writer.");
      }
    }
    if (writer != null) {
      writer.close();
    }
    super.serviceStop();
  }

  @VisibleForTesting
  boolean writerFlusherRunning() {
    return writerFlusherRunning;
  }

  /**
   * Task that invokes the flush operation on the timeline writer.
   */
  private static class WriterFlushTask implements Runnable {
    private final TimelineWriter writer;

    public WriterFlushTask(TimelineWriter writer) {
      this.writer = writer;
    }

    public void run() {
      try {
        // synchronize on the writer object to avoid flushing timeline
        // entities placed on the buffer by synchronous putEntities
        // requests.
        synchronized (writer) {
          writer.flush();
        }
      } catch (Throwable th) {
        // we need to handle all exceptions or subsequent execution may be
        // suppressed
        LOG.error("exception during timeline writer flush!", th);
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AppLevelTimelineCollector 源码

hadoop AppLevelTimelineCollectorWithAgg 源码

hadoop NodeTimelineCollectorManager 源码

hadoop PerNodeTimelineCollectorsAuxService 源码

hadoop TimelineCollector 源码

hadoop TimelineCollectorContext 源码

hadoop TimelineCollectorWebService 源码

hadoop package-info 源码

0  赞