hadoop PacketHeader 源码

  • 2022-10-20
  • 浏览 (95)

haddop PacketHeader 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.protocol.datatransfer;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Shorts;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;

/**
 * Header data for each packet that goes through the read/write pipelines.
 * Includes all of the information about the packet, excluding checksums and
 * actual data.
 *
 * This data includes:
 *  - the offset in bytes into the HDFS block of the data in this packet
 *  - the sequence number of this packet in the pipeline
 *  - whether or not this is the last packet in the pipeline
 *  - the length of the data in this packet
 *  - whether or not this packet should be synced by the DNs.
 *
 * When serialized, this header is written out as a protocol buffer, preceded
 * by a 4-byte integer representing the full packet length, and a 2-byte short
 * representing the header length.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PacketHeader {
  private static final int MAX_PROTO_SIZE = PacketHeaderProto.newBuilder()
      .setOffsetInBlock(0)
      .setSeqno(0)
      .setLastPacketInBlock(false)
      .setDataLen(0)
      .setSyncBlock(false)
      .build().getSerializedSize();
  public static final int PKT_LENGTHS_LEN =
      Ints.BYTES + Shorts.BYTES;
  public static final int PKT_MAX_HEADER_LEN =
      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;

  private int packetLen;
  private PacketHeaderProto proto;

  public PacketHeader() {
  }

  public PacketHeader(int packetLen, long offsetInBlock, long seqno,
                      boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
    this.packetLen = packetLen;
    Preconditions.checkArgument(packetLen >= Ints.BYTES,
        "packet len %s should always be at least 4 bytes",
        packetLen);

    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
        .setOffsetInBlock(offsetInBlock)
        .setSeqno(seqno)
        .setLastPacketInBlock(lastPacketInBlock)
        .setDataLen(dataLen);

    if (syncBlock) {
      // Only set syncBlock if it is specified.
      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
      // because it changes the length of the packet header, and BlockReceiver
      // in that version did not support variable-length headers.
      builder.setSyncBlock(true);
    }

    proto = builder.build();
  }

  public int getDataLen() {
    return proto.getDataLen();
  }

  public boolean isLastPacketInBlock() {
    return proto.getLastPacketInBlock();
  }

  public long getSeqno() {
    return proto.getSeqno();
  }

  public long getOffsetInBlock() {
    return proto.getOffsetInBlock();
  }

  public int getPacketLen() {
    return packetLen;
  }

  public boolean getSyncBlock() {
    return proto.getSyncBlock();
  }

  @Override
  public String toString() {
    return "PacketHeader with packetLen=" + packetLen +
      " header data: " +
      proto.toString();
  }

  public void setFieldsFromData(
      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
    this.packetLen = packetLen;
    proto = PacketHeaderProto.parseFrom(headerData);
  }

  public void readFields(ByteBuffer buf) throws IOException {
    packetLen = buf.getInt();
    short protoLen = buf.getShort();
    byte[] data = new byte[protoLen];
    buf.get(data);
    proto = PacketHeaderProto.parseFrom(data);
  }

  public void readFields(DataInputStream in) throws IOException {
    this.packetLen = in.readInt();
    short protoLen = in.readShort();
    byte[] data = new byte[protoLen];
    in.readFully(data);
    proto = PacketHeaderProto.parseFrom(data);
  }

  /**
   * @return the number of bytes necessary to write out this header,
   * including the length-prefixing of the payload and header
   */
  public int getSerializedSize() {
    return PKT_LENGTHS_LEN + proto.getSerializedSize();
  }

  /**
   * Write the header into the buffer.
   * This requires that PKT_HEADER_LEN bytes are available.
   */
  public void putInBuffer(final ByteBuffer buf) {
    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
    try {
      buf.putInt(packetLen);
      buf.putShort((short) proto.getSerializedSize());
      proto.writeTo(new ByteBufferOutputStream(buf));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  public void write(DataOutputStream out) throws IOException {
    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
    out.writeInt(packetLen);
    out.writeShort(proto.getSerializedSize());
    proto.writeTo(out);
  }

  public byte[] getBytes() {
    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
    putInBuffer(buf);
    return buf.array();
  }

  /**
   * Perform a sanity check on the packet, returning true if it is sane.
   * @param lastSeqNo the previous sequence number received - we expect the
   *                  current sequence number to be larger by 1.
   */
  public boolean sanityCheck(long lastSeqNo) {
    // We should only have a non-positive data length for the last packet
    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
    // The last packet should not contain data
    if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
    // Seqnos should always increase by 1 with each packet received
    return proto.getSeqno() == lastSeqNo + 1;
  }

  @Override
  public boolean equals(Object o) {
    if (!(o instanceof PacketHeader)) return false;
    PacketHeader other = (PacketHeader)o;
    return this.proto.equals(other.proto);
  }

  @Override
  public int hashCode() {
    return (int)proto.getSeqno();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockConstructionStage 源码

hadoop BlockPinningException 源码

hadoop DataTransferProtoUtil 源码

hadoop DataTransferProtocol 源码

hadoop IOStreamPair 源码

hadoop InvalidEncryptionKeyException 源码

hadoop Op 源码

hadoop PacketReceiver 源码

hadoop PipelineAck 源码

hadoop ReplaceDatanodeOnFailure 源码

0  赞