spark VariableLengthRowBasedKeyValueBatch 源码
spark VariableLengthRowBasedKeyValueBatch 代码
文件路径:/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
/**
 * An implementation of `RowBasedKeyValueBatch` in which key-value records have variable lengths.
 *
 *  The format for each record looks like this (in case of uaoSize = 4):
 * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen]
 * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen]
 * [8 bytes pointer to next]
 * Thus, record length = 4 + 4 + klen + vlen + 8
 */
public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatch {
  // full addresses for key rows and value rows
  private final long[] keyOffsets;
  /**
   * Append a key value pair.
   * It copies data into the backing MemoryBlock.
   * Returns an UnsafeRow pointing to the value if succeeds, otherwise returns null.
   */
  @Override
  public UnsafeRow appendRow(Object kbase, long koff, int klen,
                             Object vbase, long voff, int vlen) {
    int uaoSize = UnsafeAlignedOffset.getUaoSize();
    final long recordLength = 2L * uaoSize + klen + vlen + 8L;
    // if run out of max supported rows or page size, return null
    if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
      return null;
    }
    long offset = page.getBaseOffset() + pageCursor;
    final long recordOffset = offset;
    UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
    UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
    offset += 2L * uaoSize;
    Platform.copyMemory(kbase, koff, base, offset, klen);
    offset += klen;
    Platform.copyMemory(vbase, voff, base, offset, vlen);
    offset += vlen;
    Platform.putLong(base, offset, 0);
    pageCursor += recordLength;
    keyOffsets[numRows] = recordOffset + 2L * uaoSize;
    keyRowId = numRows;
    keyRow.pointTo(base, recordOffset + 2L * uaoSize, klen);
    valueRow.pointTo(base, recordOffset + 2L * uaoSize + klen, vlen);
    numRows++;
    return valueRow;
  }
  /**
   * Returns the key row in this batch at `rowId`. Returned key row is reused across calls.
   */
  @Override
  public UnsafeRow getKeyRow(int rowId) {
    assert(rowId >= 0);
    assert(rowId < numRows);
    if (keyRowId != rowId) { // if keyRowId == rowId, desired keyRow is already cached
      long offset = keyOffsets[rowId];
      int klen = UnsafeAlignedOffset.getSize(base, offset - UnsafeAlignedOffset.getUaoSize());
      keyRow.pointTo(base, offset, klen);
      // set keyRowId so we can check if desired row is cached
      keyRowId = rowId;
    }
    return keyRow;
  }
  /**
   * Returns the value row by two steps:
   * 1) looking up the key row with the same id (skipped if the key row is cached)
   * 2) retrieve the value row by reusing the metadata from step 1)
   * In most times, 1) is skipped because `getKeyRow(id)` is often called before `getValueRow(id)`.
   */
  @Override
  public UnsafeRow getValueFromKey(int rowId) {
    if (keyRowId != rowId) {
      getKeyRow(rowId);
    }
    assert(rowId >= 0);
    int uaoSize = UnsafeAlignedOffset.getUaoSize();
    long offset = keyRow.getBaseOffset();
    int klen = keyRow.getSizeInBytes();
    int vlen = UnsafeAlignedOffset.getSize(base, offset - uaoSize * 2L) - klen - uaoSize;
    valueRow.pointTo(base, offset + klen, vlen);
    return valueRow;
  }
  /**
   * Returns an iterator to go through all rows
   */
  @Override
  public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
    return new org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>() {
      private final UnsafeRow key = new UnsafeRow(keySchema.length());
      private final UnsafeRow value = new UnsafeRow(valueSchema.length());
      private long offsetInPage = 0;
      private int recordsInPage = 0;
      private int currentklen;
      private int currentvlen;
      private int totalLength;
      private boolean initialized = false;
      private void init() {
        if (page != null) {
          offsetInPage = page.getBaseOffset();
          recordsInPage = numRows;
        }
        initialized = true;
      }
      @Override
      public boolean next() {
        if (!initialized) init();
        //searching for the next non empty page is records is now zero
        if (recordsInPage == 0) {
          freeCurrentPage();
          return false;
        }
        int uaoSize = UnsafeAlignedOffset.getUaoSize();
        totalLength = UnsafeAlignedOffset.getSize(base, offsetInPage) - uaoSize;
        currentklen = UnsafeAlignedOffset.getSize(base, offsetInPage + uaoSize);
        currentvlen = totalLength - currentklen;
        key.pointTo(base, offsetInPage + 2L * uaoSize, currentklen);
        value.pointTo(base, offsetInPage + 2L * uaoSize + currentklen, currentvlen);
        offsetInPage += 2L * uaoSize + totalLength + 8;
        recordsInPage -= 1;
        return true;
      }
      @Override
      public UnsafeRow getKey() {
        return key;
      }
      @Override
      public UnsafeRow getValue() {
        return value;
      }
      @Override
      public void close() {
        // do nothing
      }
      private void freeCurrentPage() {
        if (page != null) {
          freePage(page);
          page = null;
        }
      }
    };
  }
  VariableLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
      int maxRows, TaskMemoryManager manager) {
    super(keySchema, valueSchema, maxRows, manager);
    this.keyOffsets = new long[maxRows];
  }
}
相关信息
相关文章
spark ExpressionDescription 源码
spark FixedLengthRowBasedKeyValueBatch 源码
spark RowBasedKeyValueBatch 源码
                        
                            0
                        
                        
                             赞
                        
                    
                    
                热门推荐
- 
                        2、 - 优质文章
- 
                        3、 gate.io
- 
                        8、 openharmony
- 
                        9、 golang