hadoop AbstractTimelineStorageReader 源码

  • 2022-10-20
  • 浏览 (247)

haddop AbstractTimelineStorageReader 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
import org.apache.hadoop.yarn.webapp.NotFoundException;

/**
 * The base class for reading timeline data from the HBase storage. This class
 * provides basic support to validate and augment reader context.
 */
public abstract class AbstractTimelineStorageReader {

  private final TimelineReaderContext context;
  /**
   * Used to look up the flow context.
   */
  private final AppToFlowTableRW appToFlowTable = new AppToFlowTableRW();

  public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
    context = ctxt;
  }

  protected TimelineReaderContext getContext() {
    return context;
  }

  /**
   * Looks up flow context from AppToFlow table.
   *
   * @param appToFlowRowKey to identify Cluster and App Ids.
   * @param clusterId the cluster id.
   * @param hbaseConf HBase configuration.
   * @param conn HBase Connection.
   * @return flow context information.
   * @throws IOException if any problem occurs while fetching flow information.
   */
  protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
      String clusterId, Configuration hbaseConf, Connection conn)
      throws IOException {
    byte[] rowKey = appToFlowRowKey.getRowKey();
    Get get = new Get(rowKey);
    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
    if (result != null && !result.isEmpty()) {
      Object flowName = ColumnRWHelper.readResult(
          result, AppToFlowColumnPrefix.FLOW_NAME, clusterId);
      Object flowRunId = ColumnRWHelper.readResult(
          result, AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId);
      Object userId = ColumnRWHelper.readResult(
          result, AppToFlowColumnPrefix.USER_ID, clusterId);
      if (flowName == null || userId == null || flowRunId == null) {
        throw new NotFoundException(
            "Unable to find the context flow name, and flow run id, "
            + "and user id for clusterId=" + clusterId
            + ", appId=" + appToFlowRowKey.getAppId());
      }
      return new FlowContext((String)userId, (String)flowName,
          ((Number)flowRunId).longValue());
    } else {
      throw new NotFoundException(
          "Unable to find the context flow name, and flow run id, "
          + "and user id for clusterId=" + clusterId
          + ", appId=" + appToFlowRowKey.getAppId());
    }
  }

  /**
    * Sets certain parameters to defaults if the values are not provided.
    *
    * @param hbaseConf HBase Configuration.
    * @param conn HBase Connection.
    * @throws IOException if any exception is encountered while setting params.
    */
  protected void augmentParams(Configuration hbaseConf, Connection conn)
      throws IOException {
    defaultAugmentParams(hbaseConf, conn);
  }

  /**
   * Default behavior for all timeline readers to augment parameters.
   *
   * @param hbaseConf HBase Configuration.
   * @param conn HBase Connection.
   * @throws IOException if any exception is encountered while setting params.
   */
  final protected void defaultAugmentParams(Configuration hbaseConf,
      Connection conn) throws IOException {
    // In reality all three should be null or neither should be null
    if (context.getFlowName() == null || context.getFlowRunId() == null
        || context.getUserId() == null) {
      // Get flow context information from AppToFlow table.
      AppToFlowRowKey appToFlowRowKey =
          new AppToFlowRowKey(context.getAppId());
      FlowContext flowContext =
          lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
          conn);
      context.setFlowName(flowContext.flowName);
      context.setFlowRunId(flowContext.flowRunId);
      context.setUserId(flowContext.userId);
    }
  }

  /**
   * Validates the required parameters to read the entities.
   */
  protected abstract void validateParams();

  /**
   * Encapsulates flow context information.
   */
  protected static class FlowContext {
    private final String userId;
    private final String flowName;
    private final Long flowRunId;

    public FlowContext(String user, String flowName, Long flowRunId) {
      this.userId = user;
      this.flowName = flowName;
      this.flowRunId = flowRunId;
    }

    protected String getUserId() {
      return userId;
    }

    protected String getFlowName() {
      return flowName;
    }

    protected Long getFlowRunId() {
      return flowRunId;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ApplicationEntityReader 源码

hadoop EntityTypeReader 源码

hadoop FlowActivityEntityReader 源码

hadoop FlowRunEntityReader 源码

hadoop GenericEntityReader 源码

hadoop SubApplicationEntityReader 源码

hadoop TimelineEntityReader 源码

hadoop TimelineEntityReaderFactory 源码

hadoop package-info 源码

0  赞