spark CountMinSketchAgg 源码
spark CountMinSketchAgg 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CountMinSketchAgg.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal}
import org.apache.spark.sql.catalyst.trees.QuaternaryLike
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.sketch.CountMinSketch
/**
* This function returns a count-min sketch of a column with the given esp, confidence and seed.
* A count-min sketch is a probabilistic data structure used for summarizing streams of data in
* sub-linear space, which is useful for equality predicates and join size estimation.
* The result returned by the function is an array of bytes, which should be deserialized to a
* `CountMinSketch` before usage.
*
* @param child child expression that can produce column value with `child.eval(inputRow)`
* @param epsExpression relative error, must be positive
* @param confidenceExpression confidence, must be positive and less than 1.0
* @param seedExpression random seed
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(col, eps, confidence, seed) - Returns a count-min sketch of a column with the given esp,
confidence and seed. The result is an array of bytes, which can be deserialized to a
`CountMinSketch` before usage. Count-min sketch is a probabilistic data structure used for
cardinality estimation using sub-linear space.
""",
examples = """
Examples:
> SELECT hex(_FUNC_(col, 0.5d, 0.5d, 1)) FROM VALUES (1), (2), (1) AS tab(col);
0000000100000000000000030000000100000004000000005D8D6AB90000000000000000000000000000000200000000000000010000000000000000
""",
group = "agg_funcs",
since = "2.2.0")
// scalastyle:on line.size.limit
case class CountMinSketchAgg(
child: Expression,
epsExpression: Expression,
confidenceExpression: Expression,
seedExpression: Expression,
override val mutableAggBufferOffset: Int,
override val inputAggBufferOffset: Int)
extends TypedImperativeAggregate[CountMinSketch]
with ExpectsInputTypes
with QuaternaryLike[Expression] {
def this(
child: Expression,
epsExpression: Expression,
confidenceExpression: Expression,
seedExpression: Expression) = {
this(child, epsExpression, confidenceExpression, seedExpression, 0, 0)
}
// Mark as lazy so that they are not evaluated during tree transformation.
private lazy val eps: Double = epsExpression.eval().asInstanceOf[Double]
private lazy val confidence: Double = confidenceExpression.eval().asInstanceOf[Double]
private lazy val seed: Int = seedExpression.eval().asInstanceOf[Int]
override def checkInputDataTypes(): TypeCheckResult = {
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
} else if (!epsExpression.foldable || !confidenceExpression.foldable ||
!seedExpression.foldable) {
TypeCheckFailure(
"The eps, confidence or seed provided must be a literal or foldable")
} else if (epsExpression.eval() == null || confidenceExpression.eval() == null ||
seedExpression.eval() == null) {
TypeCheckFailure("The eps, confidence or seed provided should not be null")
} else if (eps <= 0.0) {
TypeCheckFailure(s"Relative error must be positive (current value = $eps)")
} else if (confidence <= 0.0 || confidence >= 1.0) {
TypeCheckFailure(s"Confidence must be within range (0.0, 1.0) (current value = $confidence)")
} else {
TypeCheckSuccess
}
}
override def createAggregationBuffer(): CountMinSketch = {
CountMinSketch.create(eps, confidence, seed)
}
override def update(buffer: CountMinSketch, input: InternalRow): CountMinSketch = {
val value = child.eval(input)
// Ignore empty rows
if (value != null) {
child.dataType match {
// For string type, we can get bytes of our `UTF8String` directly, and call the `addBinary`
// instead of `addString` to avoid unnecessary conversion.
case StringType => buffer.addBinary(value.asInstanceOf[UTF8String].getBytes)
case _ => buffer.add(value)
}
}
buffer
}
override def merge(buffer: CountMinSketch, input: CountMinSketch): CountMinSketch = {
buffer.mergeInPlace(input)
buffer
}
override def eval(buffer: CountMinSketch): Any = serialize(buffer)
override def serialize(buffer: CountMinSketch): Array[Byte] = {
buffer.toByteArray
}
override def deserialize(storageFormat: Array[Byte]): CountMinSketch = {
CountMinSketch.readFrom(storageFormat)
}
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): CountMinSketchAgg =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): CountMinSketchAgg =
copy(inputAggBufferOffset = newInputAggBufferOffset)
override def inputTypes: Seq[AbstractDataType] = {
Seq(TypeCollection(IntegralType, StringType, BinaryType), DoubleType, DoubleType, IntegerType)
}
override def nullable: Boolean = false
override def dataType: DataType = BinaryType
override def defaultResult: Option[Literal] =
Option(Literal.create(eval(createAggregationBuffer()), dataType))
override def prettyName: String = "count_min_sketch"
override def first: Expression = child
override def second: Expression = epsExpression
override def third: Expression = confidenceExpression
override def fourth: Expression = seedExpression
override protected def withNewChildrenInternal(first: Expression, second: Expression,
third: Expression, fourth: Expression): CountMinSketchAgg =
copy(
child = first,
epsExpression = second,
confidenceExpression = third,
seedExpression = fourth)
}
相关信息
相关文章
spark ApproxCountDistinctForIntervals 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦