spark PythonUDF 源码
spark PythonUDF 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.api.python.{PythonEvalType, PythonFunction}
import org.apache.spark.sql.catalyst.trees.TreePattern.{PYTHON_UDF, TreePattern}
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.types.DataType
/**
* Helper functions for [[PythonUDF]]
*/
object PythonUDF {
private[this] val SCALAR_TYPES = Set(
PythonEvalType.SQL_BATCHED_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
)
def isScalarPythonUDF(e: Expression): Boolean = {
e.isInstanceOf[PythonUDF] && SCALAR_TYPES.contains(e.asInstanceOf[PythonUDF].evalType)
}
def isGroupedAggPandasUDF(e: Expression): Boolean = {
e.isInstanceOf[PythonUDF] &&
e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
}
// This is currently same as GroupedAggPandasUDF, but we might support new types in the future,
// e.g, N -> N transform.
def isWindowPandasUDF(e: Expression): Boolean = isGroupedAggPandasUDF(e)
}
/**
* A serialized version of a Python lambda function. This is a special expression, which needs a
* dedicated physical operator to execute it, and thus can't be pushed down to data sources.
*/
case class PythonUDF(
name: String,
func: PythonFunction,
dataType: DataType,
children: Seq[Expression],
evalType: Int,
udfDeterministic: Boolean,
resultId: ExprId = NamedExpression.newExprId)
extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression {
override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
override def toString: String = s"$name(${children.mkString(", ")})#${resultId.id}$typeSuffix"
final override val nodePatterns: Seq[TreePattern] = Seq(PYTHON_UDF)
lazy val resultAttribute: Attribute = AttributeReference(toPrettySQL(this), dataType, nullable)(
exprId = resultId)
override def nullable: Boolean = true
override lazy val canonicalized: Expression = {
val canonicalizedChildren = children.map(_.canonicalized)
// `resultId` can be seen as cosmetic variation in PythonUDF, as it doesn't affect the result.
this.copy(resultId = ExprId(-1)).withNewChildren(canonicalizedChildren)
}
override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): PythonUDF =
copy(children = newChildren)
}
/**
* A place holder used when printing expressions without debugging information such as the
* result id.
*/
case class PrettyPythonUDF(
name: String,
dataType: DataType,
children: Seq[Expression])
extends Expression with Unevaluable with NonSQLExpression {
override def toString: String = s"$name(${children.mkString(", ")})"
override def nullable: Boolean = true
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): PrettyPythonUDF = copy(children = newChildren)
}
相关信息
相关文章
spark ApplyFunctionExpression 源码
spark BloomFilterMightContain 源码
spark CallMethodViaReflection 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦