hadoop TimelineCollector 源码

  • 2022-10-20
  • 浏览 (209)

haddop TimelineCollector 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timelineservice.collector;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Service that handles writes to the timeline service and writes them to the
 * backing storage.
 *
 * Classes that extend this can add their own lifecycle management or
 * customization of request handling.
 */
@Private
@Unstable
public abstract class TimelineCollector extends CompositeService {

  private static final Logger LOG =
      LoggerFactory.getLogger(TimelineCollector.class);
  public static final String SEPARATOR = "_";

  private TimelineWriter writer;
  private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
      = new ConcurrentHashMap<>();
  private static Set<String> entityTypesSkipAggregation
      = new HashSet<>();
  private ThreadPoolExecutor pool;

  private volatile boolean readyToAggregate = false;

  private volatile boolean isStopped = false;

  private int maxWriteRetries;
  private long writeRetryInterval;

  public TimelineCollector(String name) {
    super(name);
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    super.serviceInit(conf);
    int capacity = conf.getInt(
        YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY
    );
    pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(capacity));
    pool.setRejectedExecutionHandler(
        new ThreadPoolExecutor.DiscardOldestPolicy());

    maxWriteRetries =
        conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
    writeRetryInterval = conf.getLong(
        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
  }

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    isStopped = true;
    pool.shutdownNow();
    super.serviceStop();
  }

  boolean isStopped() {
    return isStopped;
  }

  protected void setWriter(TimelineWriter w) {
    this.writer = w;
  }

  protected Map<String, AggregationStatusTable> getAggregationGroups() {
    return aggregationGroups;
  }

  protected void setReadyToAggregate() {
    readyToAggregate = true;
  }

  protected boolean isReadyToAggregate() {
    return readyToAggregate;
  }

  /**
   * Method to decide the set of timeline entity types the collector should
   * skip on aggregations. Subclasses may want to override this method to
   * customize their own behaviors.
   *
   * @return A set of strings consists of all types the collector should skip.
   */
  protected Set<String> getEntityTypesSkipAggregation() {
    return entityTypesSkipAggregation;
  }

  public abstract TimelineCollectorContext getTimelineEntityContext();


  /**
   * Handles entity writes. These writes are synchronous and are written to the
   * backing storage without buffering/batching. If any entity already exists,
   * it results in an update of the entity.
   *
   * This method should be reserved for selected critical entities and events.
   * For normal voluminous writes one should use the async method
   * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
   *
   * @param entities entities to post
   * @param callerUgi the caller UGI
   * @return the response that contains the result of the post.
   * @throws IOException if there is any exception encountered while putting
   *     entities.
   */
  public TimelineWriteResponse putEntities(TimelineEntities entities,
      UserGroupInformation callerUgi) throws IOException {
    LOG.debug("putEntities(entities={}, callerUgi={})", entities, callerUgi);

    TimelineWriteResponse response = null;
    try {
      boolean isStorageUp = checkRetryWithSleep();
      if (isStorageUp) {
        // synchronize on the writer object so that no other threads can
        // flush the writer buffer concurrently and swallow any exception
        // caused by the timeline enitites that are being put here.
        synchronized (writer) {
          response = writeTimelineEntities(entities, callerUgi);
          flushBufferedTimelineEntities();
        }
      } else {
        String msg = String.format("Failed to putEntities(" +
            "entities=%s, callerUgi=%s) as Timeline Storage is Down",
            entities, callerUgi);
        throw new IOException(msg);
      }
    } catch (InterruptedException ex) {
      String msg = String.format("Interrupted while retrying to putEntities(" +
          "entities=%s, callerUgi=%s)", entities, callerUgi);
      throw new IOException(msg);
    }

    return response;
  }


  private boolean checkRetryWithSleep() throws InterruptedException {
    int retries = maxWriteRetries;
    while (retries > 0) {
      TimelineHealth timelineHealth = writer.getHealthStatus();
      if (timelineHealth.getHealthStatus().equals(
              TimelineHealth.TimelineHealthStatus.RUNNING)) {
        return true;
      } else {
        try {
          Thread.sleep(writeRetryInterval);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
          throw ex;
        }
        retries--;
      }
    }
    return false;
  }


  /**
   * Add or update an domain. If the domain already exists, only the owner
   * and the admin can update it.
   *
   * @param domain    domain to post
   * @param callerUgi the caller UGI
   * @return the response that contains the result of the post.
   * @throws IOException if there is any exception encountered while putting
   *                     domain.
   */
  public TimelineWriteResponse putDomain(TimelineDomain domain,
      UserGroupInformation callerUgi) throws IOException {
    LOG.debug("putDomain(domain={}, callerUgi={})", domain, callerUgi);

    TimelineWriteResponse response = null;
    try {
      boolean isStorageUp = checkRetryWithSleep();
      if (isStorageUp) {
        synchronized (writer) {
          final TimelineCollectorContext context = getTimelineEntityContext();
          response = writer.write(context, domain);
          flushBufferedTimelineEntities();
        }
      } else {
        String msg = String.format("Failed to putDomain(" +
            "domain=%s, callerUgi=%s) as Timeline Storage is Down",
            domain, callerUgi);
        throw new IOException(msg);
      }
    } catch (InterruptedException ex) {
      String msg = String.format("Interrupted while retrying to putDomain(" +
          "domain=%s, callerUgi=%s)", domain, callerUgi);
      throw new IOException(msg);
    }

    return response;
  }

  private TimelineWriteResponse writeTimelineEntities(
      TimelineEntities entities, UserGroupInformation callerUgi)
      throws IOException {
    // Update application metrics for aggregation
    updateAggregateStatus(entities, aggregationGroups,
        getEntityTypesSkipAggregation());

    final TimelineCollectorContext context = getTimelineEntityContext();
    return writer.write(context, entities, callerUgi);
  }

  /**
   * Flush buffered timeline entities, if any.
   * @throws IOException if there is any exception encountered while
   *      flushing buffered entities.
   */
  private void flushBufferedTimelineEntities() throws IOException {
    writer.flush();
  }

  /**
   * Handles entity writes in an asynchronous manner. The method returns as soon
   * as validation is done. No promises are made on how quickly it will be
   * written to the backing storage or if it will always be written to the
   * backing storage. Multiple writes to the same entities may be batched and
   * appropriate values updated and result in fewer writes to the backing
   * storage.
   *
   * @param entities entities to post
   * @param callerUgi the caller UGI
   * @throws IOException if there is any exception encounted while putting
   *     entities.
   */
  public void putEntitiesAsync(TimelineEntities entities,
      UserGroupInformation callerUgi) throws IOException {
    LOG.debug("putEntitiesAsync(entities={}, callerUgi={})", entities,
        callerUgi);

    pool.execute(new Runnable() {
      @Override public void run() {
        try {
          writeTimelineEntities(entities, callerUgi);
        } catch (IOException ie) {
          LOG.error("Got an exception while writing entity", ie);
        }
      }
    });
  }

  /**
   * Aggregate all metrics in given timeline entities with no predefined states.
   *
   * @param entities Entities to aggregate
   * @param resultEntityId Id of the result entity
   * @param resultEntityType Type of the result entity
   * @param needsGroupIdInResult Marks if we want the aggregation group id in
   *                             each aggregated metrics.
   * @return A timeline entity that contains all aggregated TimelineMetric.
   */
  public static TimelineEntity aggregateEntities(
      TimelineEntities entities, String resultEntityId,
      String resultEntityType, boolean needsGroupIdInResult) {
    ConcurrentMap<String, AggregationStatusTable> aggregationGroups
        = new ConcurrentHashMap<>();
    updateAggregateStatus(entities, aggregationGroups, null);
    if (needsGroupIdInResult) {
      return aggregate(aggregationGroups, resultEntityId, resultEntityType);
    } else {
      return aggregateWithoutGroupId(
          aggregationGroups, resultEntityId, resultEntityType);
    }
  }

  /**
   * Update the aggregation status table for a timeline collector.
   *
   * @param entities Entities to update
   * @param aggregationGroups Aggregation status table
   * @param typesToSkip Entity types that we can safely assume to skip updating
   */
  static void updateAggregateStatus(
      TimelineEntities entities,
      ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
      Set<String> typesToSkip) {
    for (TimelineEntity e : entities.getEntities()) {
      if ((typesToSkip != null && typesToSkip.contains(e.getType()))
          || e.getMetrics().isEmpty()) {
        continue;
      }
      AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
      if (aggrTable == null) {
        AggregationStatusTable table = new AggregationStatusTable();
        aggrTable = aggregationGroups.putIfAbsent(e.getType(),
            table);
        if (aggrTable == null) {
          aggrTable = table;
        }
      }
      aggrTable.update(e);
    }
  }

  /**
   * Aggregate internal status and generate timeline entities for the
   * aggregation results.
   *
   * @param aggregationGroups Aggregation status table
   * @param resultEntityId Id of the result entity
   * @param resultEntityType Type of the result entity
   * @return A timeline entity that contains all aggregated TimelineMetric.
   */
  static TimelineEntity aggregate(
      Map<String, AggregationStatusTable> aggregationGroups,
      String resultEntityId, String resultEntityType) {
    TimelineEntity result = new TimelineEntity();
    result.setId(resultEntityId);
    result.setType(resultEntityType);
    for (Map.Entry<String, AggregationStatusTable> entry
        : aggregationGroups.entrySet()) {
      entry.getValue().aggregateAllTo(result, entry.getKey());
    }
    return result;
  }

  /**
   * Aggregate internal status and generate timeline entities for the
   * aggregation results. The result metrics will not have aggregation group
   * information.
   *
   * @param aggregationGroups Aggregation status table
   * @param resultEntityId Id of the result entity
   * @param resultEntityType Type of the result entity
   * @return A timeline entity that contains all aggregated TimelineMetric.
   */
  static TimelineEntity aggregateWithoutGroupId(
      Map<String, AggregationStatusTable> aggregationGroups,
      String resultEntityId, String resultEntityType) {
    TimelineEntity result = new TimelineEntity();
    result.setId(resultEntityId);
    result.setType(resultEntityType);
    for (Map.Entry<String, AggregationStatusTable> entry
        : aggregationGroups.entrySet()) {
      entry.getValue().aggregateAllTo(result, "");
    }
    return result;
  }

  // Note: In memory aggregation is performed in an eventually consistent
  // fashion.
  protected static class AggregationStatusTable {
    // On aggregation, for each metric, aggregate all per-entity accumulated
    // metrics. We only use the id and type for TimelineMetrics in the key set
    // of this table.
    private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
        aggregateTable;

    public AggregationStatusTable() {
      aggregateTable = new ConcurrentHashMap<>();
    }

    public void update(TimelineEntity incoming) {
      String entityId = incoming.getId();
      for (TimelineMetric m : incoming.getMetrics()) {
        // Skip if the metric does not need aggregation
        if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
          continue;
        }
        // Update aggregateTable
        Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
        if (aggrRow == null) {
          Map<String, TimelineMetric> tempRow = new HashMap<>();
          aggrRow = aggregateTable.putIfAbsent(m, tempRow);
          if (aggrRow == null) {
            aggrRow = tempRow;
          }
        }
        synchronized (aggrRow) {
          aggrRow.put(entityId, m);
        }
      }
    }

    public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
        String aggregationGroupId) {
      if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
        return e;
      }
      Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
      if (aggrRow != null) {
        TimelineMetric aggrMetric = new TimelineMetric();
        if (aggregationGroupId.length() > 0) {
          aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
        } else {
          aggrMetric.setId(metric.getId());
        }
        aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
        Map<Object, Object> status = new HashMap<>();
        synchronized (aggrRow) {
          for (TimelineMetric m : aggrRow.values()) {
            TimelineMetric.aggregateTo(m, aggrMetric, status);
            // getRealtimeAggregationOp returns an enum so we can directly
            // compare with "!=".
            if (m.getRealtimeAggregationOp()
                != aggrMetric.getRealtimeAggregationOp()) {
              aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
            }
          }
          aggrRow.clear();
        }
        Set<TimelineMetric> metrics = e.getMetrics();
        metrics.remove(aggrMetric);
        metrics.add(aggrMetric);
      }
      return e;
    }

    public TimelineEntity aggregateAllTo(TimelineEntity e,
        String aggregationGroupId) {
      for (TimelineMetric m : aggregateTable.keySet()) {
        aggregateTo(m, e, aggregationGroupId);
      }
      return e;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AppLevelTimelineCollector 源码

hadoop AppLevelTimelineCollectorWithAgg 源码

hadoop NodeTimelineCollectorManager 源码

hadoop PerNodeTimelineCollectorsAuxService 源码

hadoop TimelineCollectorContext 源码

hadoop TimelineCollectorManager 源码

hadoop TimelineCollectorWebService 源码

hadoop package-info 源码

0  赞