spark OrcDeserializer 源码
spark OrcDeserializer 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.io._
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
* A deserializer to deserialize ORC structs to Spark rows.
*/
class OrcDeserializer(
requiredSchema: StructType,
requestedColIds: Array[Int]) {
private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
// `fieldWriters(index)` is
// - null if the respective source column is missing, since the output value
// is always null in this case
// - a function that updates target column `index` otherwise.
private val fieldWriters: Array[WritableComparable[_] => Unit] = {
requiredSchema.zipWithIndex
.map { case (f, index) =>
if (requestedColIds(index) == -1) {
null
} else {
// Create a RowUpdater instance for converting Orc objects to Catalyst rows. If any fields
// in the Orc result schema have associated existence default values, maintain a
// boolean array to track which fields have been explicitly assigned for each row.
val rowUpdater: RowUpdater =
if (requiredSchema.hasExistenceDefaultValues) {
resetExistenceDefaultsBitmask(requiredSchema)
new RowUpdaterWithBitmask(resultRow, requiredSchema.existenceDefaultsBitmask)
} else {
new RowUpdater(resultRow)
}
val writer: (Int, WritableComparable[_]) => Unit =
(ordinal, value) =>
if (value == null) {
rowUpdater.setNullAt(ordinal)
} else {
val writerFunc = newWriter(f.dataType, rowUpdater)
writerFunc(ordinal, value)
}
(value: WritableComparable[_]) => writer(index, value)
}
}.toArray
}
def deserialize(orcStruct: OrcStruct): InternalRow = {
var targetColumnIndex = 0
while (targetColumnIndex < fieldWriters.length) {
if (fieldWriters(targetColumnIndex) != null) {
val value = orcStruct.getFieldValue(requestedColIds(targetColumnIndex))
fieldWriters(targetColumnIndex)(value)
}
targetColumnIndex += 1
}
applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
resultRow
}
def deserializeFromValues(orcValues: Seq[WritableComparable[_]]): InternalRow = {
var targetColumnIndex = 0
while (targetColumnIndex < fieldWriters.length) {
if (fieldWriters(targetColumnIndex) != null) {
val value = orcValues(requestedColIds(targetColumnIndex))
fieldWriters(targetColumnIndex)(value)
}
targetColumnIndex += 1
}
resultRow
}
/**
* Creates a writer to write ORC values to Catalyst data structure at the given ordinal.
*/
private def newWriter(
dataType: DataType, updater: CatalystDataUpdater): (Int, WritableComparable[_]) => Unit =
dataType match {
case NullType => (ordinal, _) =>
updater.setNullAt(ordinal)
case BooleanType => (ordinal, value) =>
updater.setBoolean(ordinal, value.asInstanceOf[BooleanWritable].get)
case ByteType => (ordinal, value) =>
updater.setByte(ordinal, value.asInstanceOf[ByteWritable].get)
case ShortType => (ordinal, value) =>
updater.setShort(ordinal, value.asInstanceOf[ShortWritable].get)
case IntegerType | _: YearMonthIntervalType => (ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)
case LongType | _: DayTimeIntervalType | _: TimestampNTZType => (ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
case FloatType => (ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)
case DoubleType => (ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[DoubleWritable].get)
case StringType => (ordinal, value) =>
updater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Text].copyBytes))
case BinaryType => (ordinal, value) =>
val binary = value.asInstanceOf[BytesWritable]
val bytes = new Array[Byte](binary.getLength)
System.arraycopy(binary.getBytes, 0, bytes, 0, binary.getLength)
updater.set(ordinal, bytes)
case DateType => (ordinal, value) =>
updater.setInt(ordinal, OrcShimUtils.getGregorianDays(value))
case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val v = OrcShimUtils.getDecimal(value)
v.changePrecision(precision, scale)
updater.set(ordinal, v)
case st: StructType =>
val result = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(result)
val fieldConverters = st.map(_.dataType).map { dt =>
newWriter(dt, fieldUpdater)
}.toArray
val containerUpdater = updater match {
case r: RowUpdater => r
case _ =>
// If the struct is contained by an array or map, we cannot reuse the same result row.
// We must copy the result row before setting it into the array or map
new CatalystDataUpdater {
override def set(ordinal: Int, value: Any) = {
updater.set(ordinal, value.asInstanceOf[SpecificInternalRow].copy())
}
}
}
(ordinal, value) =>
val orcStruct = value.asInstanceOf[OrcStruct]
var i = 0
while (i < st.length) {
val value = orcStruct.getFieldValue(i)
if (value == null) {
result.setNullAt(i)
} else {
fieldConverters(i)(i, value)
}
i += 1
}
containerUpdater.set(ordinal, result)
case ArrayType(elementType, _) => (ordinal, value) =>
val orcArray = value.asInstanceOf[OrcList[WritableComparable[_]]]
val length = orcArray.size()
val result = createArrayData(elementType, length)
val elementUpdater = new ArrayDataUpdater(result)
val elementConverter = newWriter(elementType, elementUpdater)
var i = 0
while (i < length) {
val value = orcArray.get(i)
if (value == null) {
result.setNullAt(i)
} else {
elementConverter(i, value)
}
i += 1
}
updater.set(ordinal, result)
case MapType(keyType, valueType, _) => (ordinal, value) =>
val orcMap = value.asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]]
val length = orcMap.size()
val keyArray = createArrayData(keyType, length)
val keyUpdater = new ArrayDataUpdater(keyArray)
val keyConverter = newWriter(keyType, keyUpdater)
val valueArray = createArrayData(valueType, length)
val valueUpdater = new ArrayDataUpdater(valueArray)
val valueConverter = newWriter(valueType, valueUpdater)
var i = 0
val it = orcMap.entrySet().iterator()
while (it.hasNext) {
val entry = it.next()
keyConverter(i, entry.getKey)
val value = entry.getValue
if (value == null) {
valueArray.setNullAt(i)
} else {
valueConverter(i, value)
}
i += 1
}
// The ORC map will never have null or duplicated map keys, it's safe to create a
// ArrayBasedMapData directly here.
updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
case udt: UserDefinedType[_] => newWriter(udt.sqlType, updater)
case _ =>
throw QueryExecutionErrors.dataTypeUnsupportedYetError(dataType)
}
private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {
case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length))
case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length))
case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length))
case IntegerType | _: YearMonthIntervalType =>
UnsafeArrayData.fromPrimitiveArray(new Array[Int](length))
case LongType | _: DayTimeIntervalType =>
UnsafeArrayData.fromPrimitiveArray(new Array[Long](length))
case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length))
case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length))
case _ => new GenericArrayData(new Array[Any](length))
}
/**
* A base interface for updating values inside catalyst data structure like `InternalRow` and
* `ArrayData`.
*/
sealed trait CatalystDataUpdater {
def set(ordinal: Int, value: Any): Unit
def setNullAt(ordinal: Int): Unit = set(ordinal, null)
def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
}
class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, value)
override def setBoolean(ordinal: Int, value: Boolean): Unit = row.setBoolean(ordinal, value)
override def setByte(ordinal: Int, value: Byte): Unit = row.setByte(ordinal, value)
override def setShort(ordinal: Int, value: Short): Unit = row.setShort(ordinal, value)
override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, value)
override def setLong(ordinal: Int, value: Long): Unit = row.setLong(ordinal, value)
override def setDouble(ordinal: Int, value: Double): Unit = row.setDouble(ordinal, value)
override def setFloat(ordinal: Int, value: Float): Unit = row.setFloat(ordinal, value)
}
final class ArrayDataUpdater(array: ArrayData) extends CatalystDataUpdater {
override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, value)
override def setBoolean(ordinal: Int, value: Boolean): Unit = array.setBoolean(ordinal, value)
override def setByte(ordinal: Int, value: Byte): Unit = array.setByte(ordinal, value)
override def setShort(ordinal: Int, value: Short): Unit = array.setShort(ordinal, value)
override def setInt(ordinal: Int, value: Int): Unit = array.setInt(ordinal, value)
override def setLong(ordinal: Int, value: Long): Unit = array.setLong(ordinal, value)
override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value)
override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value)
}
/**
* Subclass of RowUpdater that also updates a boolean array bitmask. In this way, after all
* assignments are complete, it is possible to inspect the bitmask to determine which columns have
* been written at least once.
*/
final class RowUpdaterWithBitmask(
row: InternalRow, bitmask: Array[Boolean]) extends RowUpdater(row) {
override def setNullAt(ordinal: Int): Unit = {
bitmask(ordinal) = false
super.setNullAt(ordinal)
}
override def set(ordinal: Int, value: Any): Unit = {
bitmask(ordinal) = false
super.set(ordinal, value)
}
override def setBoolean(ordinal: Int, value: Boolean): Unit = {
bitmask(ordinal) = false
super.setBoolean(ordinal, value)
}
override def setByte(ordinal: Int, value: Byte): Unit = {
bitmask(ordinal) = false
super.setByte(ordinal, value)
}
override def setShort(ordinal: Int, value: Short): Unit = {
bitmask(ordinal) = false
super.setShort(ordinal, value)
}
override def setInt(ordinal: Int, value: Int): Unit = {
bitmask(ordinal) = false
super.setInt(ordinal, value)
}
override def setLong(ordinal: Int, value: Long): Unit = {
bitmask(ordinal) = false
super.setLong(ordinal, value)
}
override def setDouble(ordinal: Int, value: Double): Unit = {
bitmask(ordinal) = false
super.setDouble(ordinal, value)
}
override def setFloat(ordinal: Int, value: Float): Unit = {
bitmask(ordinal) = false
super.setFloat(ordinal, value)
}
}
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦