hadoop FlowRunRowKey 源码

  • 2022-10-20
  • 浏览 (223)

haddop FlowRunRowKey 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;

import java.util.List;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;

/**
 * Represents a rowkey for the flow run table.
 */
public class FlowRunRowKey {
  private final String clusterId;
  private final String userId;
  private final String flowName;
  private final Long flowRunId;
  private final FlowRunRowKeyConverter flowRunRowKeyConverter =
      new FlowRunRowKeyConverter();

  public FlowRunRowKey(String clusterId, String userId, String flowName,
      Long flowRunId) {
    this.clusterId = clusterId;
    this.userId = userId;
    this.flowName = flowName;
    this.flowRunId = flowRunId;
  }

  public String getClusterId() {
    return clusterId;
  }

  public String getUserId() {
    return userId;
  }

  public String getFlowName() {
    return flowName;
  }

  public Long getFlowRunId() {
    return flowRunId;
  }

  /**
   * Constructs a row key for the entity table as follows: {
   * clusterId!userId!flowName!Inverted Flow Run Id}.
   *
   * @return byte array with the row key
   */
  public byte[] getRowKey() {
    return flowRunRowKeyConverter.encode(this);
  }


  /**
   * Given the raw row key as bytes, returns the row key as an object.
   * @param rowKey Byte representation of row key.
   * @return A <cite>FlowRunRowKey</cite> object.
   */
  public static FlowRunRowKey parseRowKey(byte[] rowKey) {
    return new FlowRunRowKeyConverter().decode(rowKey);
  }

  /**
   * Constructs a row key for the flow run table as follows:
   * {@code clusterId!userId!flowName!Flow Run Id}.
   * @return String representation of row key
   */
  public String getRowKeyAsString() {
    return flowRunRowKeyConverter.encodeAsString(this);
  }

  /**
   * Given the encoded row key as string, returns the row key as an object.
   * @param encodedRowKey String representation of row key.
   * @return A <cite>FlowRunRowKey</cite> object.
   */
  public static FlowRunRowKey parseRowKeyFromString(String encodedRowKey) {
    return new FlowRunRowKeyConverter().decodeFromString(encodedRowKey);
  }

  /**
   * returns the Flow Key as a verbose String output.
   * @return String
   */
  @Override
  public String toString() {
    StringBuilder flowKeyStr = new StringBuilder();
    flowKeyStr.append("{clusterId=" + clusterId)
        .append(" userId=" + userId)
        .append(" flowName=" + flowName)
        .append(" flowRunId=")
        .append(flowRunId)
        .append("}");
    return flowKeyStr.toString();
  }

  /**
   * Encodes and decodes row key for flow run table.
   * The row key is of the form : clusterId!userId!flowName!flowrunId.
   * flowrunId is a long and rest are strings.
   * <p>
   */
  final private static class FlowRunRowKeyConverter implements
      KeyConverter<FlowRunRowKey>, KeyConverterToString<FlowRunRowKey> {

    private FlowRunRowKeyConverter() {
    }

    /**
     * The flow run row key is of the form clusterId!userId!flowName!flowrunId
     * with each segment separated by !. The sizes below indicate sizes of each
     * one of these segments in sequence. clusterId, userId and flowName are
     * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
     * in size (i.e. end whenever separator is encountered). This is used while
     * decoding and helps in determining where to split.
     */
    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };

    /*
     * (non-Javadoc)
     *
     * Encodes FlowRunRowKey object into a byte array with each component/field
     * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an flow
     * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
     * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
     * clusterId, userId and flowName are not null), this returns a row key
     * prefix of the form clusterId!userName!flowName! flowRunId is inverted
     * while encoding as it helps maintain a descending order for flow keys in
     * flow run table.
     *
     * @see
     * org.apache.hadoop.yarn.server.timelineservice.storage.common
     * .KeyConverter#encode(java.lang.Object)
     */
    @Override
    public byte[] encode(FlowRunRowKey rowKey) {
      byte[] first =
          Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
              .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
                  Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
      if (rowKey.getFlowRunId() == null) {
        return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
      } else {
        // Note that flowRunId is a long, so we can't encode them all at the
        // same
        // time.
        byte[] second =
            Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
        return Separator.QUALIFIERS.join(first, second);
      }
    }

    /*
     * (non-Javadoc)
     *
     * Decodes an flow run row key of the form
     * clusterId!userId!flowName!flowrunId represented in byte format and
     * converts it into an FlowRunRowKey object. flowRunId is inverted while
     * decoding as it was inverted while encoding.
     *
     * @see
     * org.apache.hadoop.yarn.server.timelineservice.storage.common
     * .KeyConverter#decode(byte[])
     */
    @Override
    public FlowRunRowKey decode(byte[] rowKey) {
      byte[][] rowKeyComponents =
          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
      if (rowKeyComponents.length != 4) {
        throw new IllegalArgumentException("the row key is not valid for "
            + "a flow run");
      }
      String clusterId =
          Separator.decode(Bytes.toString(rowKeyComponents[0]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      String userId =
          Separator.decode(Bytes.toString(rowKeyComponents[1]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      String flowName =
          Separator.decode(Bytes.toString(rowKeyComponents[2]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      Long flowRunId =
          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
      return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
    }

    @Override
    public String encodeAsString(FlowRunRowKey key) {
      if (key.clusterId == null || key.userId == null || key.flowName == null
          || key.flowRunId == null) {
        throw new IllegalArgumentException();
      }
      return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
          key.clusterId, key.userId, key.flowName, key.flowRunId.toString()});
    }

    @Override
    public FlowRunRowKey decodeFromString(String encodedRowKey) {
      List<String> split = TimelineReaderUtils.split(encodedRowKey);
      if (split == null || split.size() != 4) {
        throw new IllegalArgumentException(
            "Invalid row key for flow run table.");
      }
      Long flowRunId = Long.valueOf(split.get(3));
      return new FlowRunRowKey(split.get(0), split.get(1), split.get(2),
          flowRunId);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AggregationCompactionDimension 源码

hadoop AggregationOperation 源码

hadoop Attribute 源码

hadoop FlowActivityColumnFamily 源码

hadoop FlowActivityColumnPrefix 源码

hadoop FlowActivityRowKey 源码

hadoop FlowActivityRowKeyPrefix 源码

hadoop FlowActivityTable 源码

hadoop FlowRunColumn 源码

hadoop FlowRunColumnFamily 源码

0  赞