spark CheckAnalysis 源码
spark CheckAnalysis 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Median, PercentileCont, PercentileDisc}
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
/**
* Throws user facing errors when passed invalid queries that fail to analyze.
*/
trait CheckAnalysis extends PredicateHelper with LookupCatalog {
protected def isView(nameParts: Seq[String]): Boolean
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
/**
* Override to provide additional checks for correct analysis.
* These rules will be evaluated after our built-in check rules.
*/
val extendedCheckRules: Seq[LogicalPlan => Unit] = Nil
val DATA_TYPE_MISMATCH_ERROR = TreeNodeTag[Boolean]("dataTypeMismatchError")
protected def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg)
}
protected def containsMultipleGenerators(exprs: Seq[Expression]): Boolean = {
exprs.flatMap(_.collect {
case e: Generator => e
}).length > 1
}
protected def hasMapType(dt: DataType): Boolean = {
dt.existsRecursively(_.isInstanceOf[MapType])
}
protected def mapColumnInSetOperation(plan: LogicalPlan): Option[Attribute] = plan match {
case _: Intersect | _: Except | _: Distinct =>
plan.output.find(a => hasMapType(a.dataType))
case d: Deduplicate =>
d.keys.find(a => hasMapType(a.dataType))
case _ => None
}
private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => failAnalysis(
s"The $name expression must evaluate to a constant value, but got " +
limitExpr.sql)
case e if e.dataType != IntegerType => failAnalysis(
s"The $name expression must be integer type, but got " +
e.dataType.catalogString)
case e =>
e.eval() match {
case null => failAnalysis(
s"The evaluated $name expression must not be null, but got ${limitExpr.sql}")
case v: Int if v < 0 => failAnalysis(
s"The $name expression must be equal to or greater than 0, but got $v")
case _ => // OK
}
}
}
private def isMapWithStringKey(e: Expression): Boolean = if (e.resolved) {
e.dataType match {
case m: MapType => m.keyType.isInstanceOf[StringType]
case _ => false
}
} else {
false
}
private def failUnresolvedAttribute(
operator: LogicalPlan,
a: Attribute,
errorClass: String): Nothing = {
val missingCol = a.sql
val candidates = operator.inputSet.toSeq.map(_.qualifiedName)
val orderedCandidates = StringUtils.orderStringsBySimilarity(missingCol, candidates)
throw QueryCompilationErrors.unresolvedAttributeError(
errorClass, missingCol, orderedCandidates, a.origin)
}
def checkAnalysis(plan: LogicalPlan): Unit = {
val inlineCTE = InlineCTE(alwaysInline = true)
val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int)]
inlineCTE.buildCTEMap(plan, cteMap)
cteMap.values.foreach { case (relation, refCount) =>
// If a CTE relation is never used, it will disappear after inline. Here we explicitly check
// analysis for it, to make sure the entire query plan is valid.
if (refCount == 0) checkAnalysis0(relation.child)
}
// Inline all CTEs in the plan to help check query plan structures in subqueries.
checkAnalysis0(inlineCTE(plan))
}
def checkAnalysis0(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans
case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
throw new IllegalStateException(
"[BUG] logical plan should not have output of char/varchar type: " + leaf)
case u: UnresolvedNamespace =>
u.schemaNotFound(u.multipartIdentifier)
case u: UnresolvedTable =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedView =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedTableOrView =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedRelation =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedFunc =>
throw QueryCompilationErrors.noSuchFunctionError(
u.multipartIdentifier, u, u.possibleQualifiedName)
case u: UnresolvedHint =>
u.failAnalysis(s"Hint not found: ${u.name}")
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
write.table.tableNotFound(tblName)
case command: V2PartitionCommand =>
command.table match {
case r @ ResolvedTable(_, _, table, _) => table match {
case t: SupportsPartitionManagement =>
if (t.partitionSchema.isEmpty) {
failAnalysis(s"Table ${r.name} is not partitioned.")
}
case _ =>
failAnalysis(s"Table ${r.name} does not support partition management.")
}
case _ =>
}
// `ShowTableExtended` should have been converted to the v1 command if the table is v1.
case _: ShowTableExtended =>
throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW TABLE EXTENDED")
case operator: LogicalPlan =>
operator transformExpressionsDown {
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
// the argument functions will not get resolved, but we should report the argument type
// check failure instead of claiming the argument functions are unresolved.
case hof: HigherOrderFunction
if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure =>
hof.checkArgumentDataTypes() match {
case checkRes: TypeCheckResult.DataTypeMismatch =>
hof.dataTypeMismatch(hof, checkRes)
case TypeCheckResult.TypeCheckFailure(message) =>
hof.failAnalysis(
s"cannot resolve '${hof.sql}' due to argument data type mismatch: $message")
}
// If an attribute can't be resolved as a map key of string type, either the key should be
// surrounded with single quotes, or there is a typo in the attribute name.
case GetMapValue(map, key: Attribute) if isMapWithStringKey(map) && !key.resolved =>
failUnresolvedAttribute(operator, key, "UNRESOLVED_MAP_KEY")
}
getAllExpressions(operator).foreach(_.foreachUp {
case a: Attribute if !a.resolved =>
failUnresolvedAttribute(operator, a, "UNRESOLVED_COLUMN")
case s: Star =>
withPosition(s) {
throw QueryCompilationErrors.invalidStarUsageError(operator.nodeName, Seq(s))
}
case e: Expression if e.checkInputDataTypes().isFailure =>
e.checkInputDataTypes() match {
case checkRes: TypeCheckResult.DataTypeMismatch =>
e.setTagValue(DATA_TYPE_MISMATCH_ERROR, true)
e.dataTypeMismatch(e, checkRes)
case TypeCheckResult.TypeCheckFailure(message) =>
e.setTagValue(DATA_TYPE_MISMATCH_ERROR, true)
e.failAnalysis(
s"cannot resolve '${e.sql}' due to data type mismatch: $message" +
extraHintForAnsiTypeCoercionExpression(operator))
}
case c: Cast if !c.resolved =>
failAnalysis(s"invalid cast from ${c.child.dataType.catalogString} to " +
c.dataType.catalogString)
case e: RuntimeReplaceable if !e.replacement.resolved =>
throw new IllegalStateException("Illegal RuntimeReplaceable: " + e +
"\nReplacement is unresolved: " + e.replacement)
case g: Grouping =>
failAnalysis("grouping() can only be used with GroupingSets/Cube/Rollup")
case g: GroupingID =>
failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup")
case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
!e.isInstanceOf[WindowExpression] && e.resolved =>
val w = e.children.find(_.isInstanceOf[WindowFunction]).get
failAnalysis(s"Window function $w requires an OVER clause.")
case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")
case w @ WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
if order.isEmpty || !frame.isOffset =>
failAnalysis(s"${wf.prettyName} function can only be evaluated in an ordered " +
s"row-based window frame with a single offset: $w")
case w: WindowExpression =>
// Only allow window functions with an aggregate expression or an offset window
// function or a Pandas window UDF.
w.windowFunction match {
case agg @ AggregateExpression(
_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _)
if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification !=
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) =>
failAnalysis(
s"Cannot specify order by or frame for '${agg.aggregateFunction.prettyName}'.")
case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
_: AggregateWindowFunction => // OK
case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) => // OK
case other =>
failAnalysis(s"Expression '$other' not supported within a window function.")
}
case s: SubqueryExpression =>
checkSubqueryExpression(operator, s)
case e: ExpressionWithRandomSeed if !e.seedExpression.foldable =>
failAnalysis(
s"Input argument to ${e.prettyName} must be a constant.")
case _ =>
})
operator match {
case etw: EventTimeWatermark =>
etw.eventTime.dataType match {
case s: StructType
if s.find(_.name == "end").map(_.dataType) == Some(TimestampType) =>
case _: TimestampType =>
case _ =>
failAnalysis(
s"Event time must be defined on a window or a timestamp, but " +
s"${etw.eventTime.name} is of type ${etw.eventTime.dataType.catalogString}")
}
case f: Filter if f.condition.dataType != BooleanType =>
failAnalysis(
s"filter expression '${f.condition.sql}' " +
s"of type ${f.condition.dataType.catalogString} is not a boolean.")
case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType =>
failAnalysis(
s"join condition '${condition.sql}' " +
s"of type ${condition.dataType.catalogString} is not a boolean.")
case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
if condition.dataType != BooleanType =>
failAnalysis(
s"join condition '${condition.sql}' " +
s"of type ${condition.dataType.catalogString} is not a boolean.")
case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
if (!toleranceAssertion.foldable) {
failAnalysis("Input argument tolerance must be a constant.")
}
if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {
failAnalysis("Input argument tolerance must be non-negative.")
}
case Aggregate(groupingExprs, aggregateExprs, _) =>
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case expr: Expression if AggregateExpression.isAggregate(expr) =>
val aggFunction = expr match {
case agg: AggregateExpression => agg.aggregateFunction
case udf: PythonUDF => udf
}
aggFunction.children.foreach { child =>
child.foreach {
case expr: Expression if AggregateExpression.isAggregate(expr) =>
failAnalysis(
s"It is not allowed to use an aggregate function in the argument of " +
s"another aggregate function. Please use the inner aggregate function " +
s"in a sub-query.")
case other => // OK
}
if (!child.deterministic) {
failAnalysis(
s"nondeterministic expression ${expr.sql} should not " +
s"appear in the arguments of an aggregate function.")
}
}
case e: Attribute if groupingExprs.isEmpty =>
// Collect all [[AggregateExpressions]]s.
val aggExprs = aggregateExprs.filter(_.collect {
case a: AggregateExpression => a
}.nonEmpty)
failAnalysis(
s"grouping expressions sequence is empty, " +
s"and '${e.sql}' is not an aggregate function. " +
s"Wrap '${aggExprs.map(_.sql).mkString("(", ", ", ")")}' in windowing " +
s"function(s) or wrap '${e.sql}' in first() (or first_value) " +
s"if you don't care which value you get."
)
case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) =>
throw QueryCompilationErrors.columnNotInGroupByClauseError(e)
case s: ScalarSubquery
if s.children.nonEmpty && !groupingExprs.exists(_.semanticEquals(s)) =>
failAnalysis(s"Correlated scalar subquery '${s.sql}' is neither " +
"present in the group by, nor in an aggregate function. Add it to group by " +
"using ordinal position or wrap it in first() (or first_value) if you don't " +
"care which value you get.")
case e if groupingExprs.exists(_.semanticEquals(e)) => // OK
case e => e.children.foreach(checkValidAggregateExpression)
}
def checkValidGroupingExprs(expr: Expression): Unit = {
if (expr.exists(_.isInstanceOf[AggregateExpression])) {
failAnalysis(
"aggregate functions are not allowed in GROUP BY, but found " + expr.sql)
}
// Check if the data type of expr is orderable.
if (!RowOrdering.isOrderable(expr.dataType)) {
failAnalysis(
s"expression ${expr.sql} cannot be used as a grouping expression " +
s"because its data type ${expr.dataType.catalogString} is not an orderable " +
s"data type.")
}
if (!expr.deterministic) {
// This is just a sanity check, our analysis rule PullOutNondeterministic should
// already pull out those nondeterministic expressions and evaluate them in
// a Project node.
failAnalysis(s"nondeterministic expression ${expr.sql} should not " +
s"appear in grouping expression.")
}
}
groupingExprs.foreach(checkValidGroupingExprs)
aggregateExprs.foreach(checkValidAggregateExpression)
case CollectMetrics(name, metrics, _) =>
if (name == null || name.isEmpty) {
operator.failAnalysis(s"observed metrics should be named: $operator")
}
// Check if an expression is a valid metric. A metric must meet the following criteria:
// - Is not a window function;
// - Is not nested aggregate function;
// - Is not a distinct aggregate function;
// - Has only non-deterministic functions that are nested inside an aggregate function;
// - Has only attributes that are nested inside an aggregate function.
def checkMetric(s: Expression, e: Expression, seenAggregate: Boolean = false): Unit = {
e match {
case _: WindowExpression =>
e.failAnalysis(
"window expressions are not allowed in observed metrics, but found: " + s.sql)
case _ if !e.deterministic && !seenAggregate =>
e.failAnalysis(s"non-deterministic expression ${s.sql} can only be used " +
"as an argument to an aggregate function.")
case a: AggregateExpression if seenAggregate =>
e.failAnalysis(
"nested aggregates are not allowed in observed metrics, but found: " + s.sql)
case a: AggregateExpression if a.isDistinct =>
e.failAnalysis(
"distinct aggregates are not allowed in observed metrics, but found: " + s.sql)
case a: AggregateExpression if a.filter.isDefined =>
e.failAnalysis("aggregates with filter predicate are not allowed in " +
"observed metrics, but found: " + s.sql)
case _: Attribute if !seenAggregate =>
e.failAnalysis (s"attribute ${s.sql} can only be used as an argument to an " +
"aggregate function.")
case _: AggregateExpression =>
e.children.foreach(checkMetric (s, _, seenAggregate = true))
case _ =>
e.children.foreach(checkMetric (s, _, seenAggregate))
}
}
metrics.foreach(m => checkMetric(m, m))
// see Analyzer.ResolveUnpivot
// given ids must be AttributeReference when no values given
case up @Unpivot(Some(ids), None, _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) &&
ids.exists(! _.isInstanceOf[AttributeReference]) =>
throw QueryCompilationErrors.unpivotRequiresAttributes("id", "value", up.ids.get)
// given values must be AttributeReference when no ids given
case up @Unpivot(None, Some(values), _, _, _, _)
if up.childrenResolved && values.forall(_.forall(_.resolved)) &&
values.exists(_.exists(! _.isInstanceOf[AttributeReference])) =>
throw QueryCompilationErrors.unpivotRequiresAttributes("value", "id", values.flatten)
// given values must not be empty seq
case up @Unpivot(Some(ids), Some(Seq()), _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) =>
throw QueryCompilationErrors.unpivotRequiresValueColumns()
// all values must have same length as there are value column names
case up @Unpivot(Some(ids), Some(values), _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) &&
values.exists(_.length != up.valueColumnNames.length) =>
throw QueryCompilationErrors.unpivotValueSizeMismatchError(up.valueColumnNames.length)
// see TypeCoercionBase.UnpivotCoercion
case up: Unpivot if up.canBeCoercioned && !up.valuesTypeCoercioned =>
throw QueryCompilationErrors.unpivotValueDataTypeMismatchError(up.values.get)
case Sort(orders, _, _) =>
orders.foreach { order =>
if (!RowOrdering.isOrderable(order.dataType)) {
failAnalysis(
s"sorting is not supported for columns of type ${order.dataType.catalogString}")
}
}
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case LocalLimit(limitExpr, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
failAnalysis(
s"""
|The sum of the LIMIT clause and the OFFSET clause must not be greater than
|the maximum 32-bit integer value (2,147,483,647),
|but found limit = $limit, offset = $offset.
|""".stripMargin.replace("\n", " "))
}
case _ =>
}
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
case _: Union | _: SetOperation if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
def ordinalNumber(i: Int): String = i match {
case 0 => "first"
case 1 => "second"
case 2 => "third"
case i => s"${i + 1}th"
}
val ref = dataTypes(operator.children.head)
operator.children.tail.zipWithIndex.foreach { case (child, ti) =>
// Check the number of columns
if (child.output.length != ref.length) {
failAnalysis(
s"""
|${operator.nodeName} can only be performed on tables with the same number
|of columns, but the first table has ${ref.length} columns and
|the ${ordinalNumber(ti + 1)} table has ${child.output.length} columns
""".stripMargin.replace("\n", " ").trim())
}
val dataTypesAreCompatibleFn = getDataTypesAreCompatibleFn(operator)
// Check if the data types match.
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
// SPARK-18058: we shall not care about the nullability of columns
if (!dataTypesAreCompatibleFn(dt1, dt2)) {
val errorMessage =
s"""
|${operator.nodeName} can only be performed on tables with compatible
|column types. The ${ordinalNumber(ci)} column of the
|${ordinalNumber(ti + 1)} table is ${dt1.catalogString} type which is not
|compatible with ${dt2.catalogString} at the same column of the first table
""".stripMargin.replace("\n", " ").trim()
failAnalysis(errorMessage + extraHintForAnsiTypeCoercionPlan(operator))
}
}
}
case create: V2CreateTablePlan =>
val references = create.partitioning.flatMap(_.references).toSet
val badReferences = references.map(_.fieldNames).flatMap { column =>
create.tableSchema.findNestedField(column) match {
case Some(_) =>
None
case _ =>
Some(s"${column.quoted} is missing or is in a map or array")
}
}
if (badReferences.nonEmpty) {
failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}")
}
create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
case alter: AlterTableCommand =>
checkAlterTableCommand(alter)
case _ => // Falls back to the following checks
}
operator match {
case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
val missingAttributes = o.missingInput.mkString(",")
val input = o.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
s"from $input in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}."
val resolver = plan.conf.resolver
val attrsWithSameName = o.missingInput.filter { missing =>
o.inputSet.exists(input => resolver(missing.name, input.name))
}
val msg = if (attrsWithSameName.nonEmpty) {
val sameNames = attrsWithSameName.map(_.name).mkString(",")
s"$msgForMissingAttributes Attribute(s) with the same name appear in the " +
s"operation: $sameNames. Please check if the right attribute(s) are used."
} else {
msgForMissingAttributes
}
failAnalysis(msg)
case p @ Project(exprs, _) if containsMultipleGenerators(exprs) =>
failAnalysis(
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.sql).mkString(",")}""".stripMargin)
case p @ Project(projectList, _) =>
projectList.foreach(_.transformDownWithPruning(
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) {
case UnresolvedWindowExpression(_, windowSpec) =>
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
})
case j: Join if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
case i: Intersect if !i.duplicateResolved =>
val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Intersect:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
case e: Except if !e.duplicateResolved =>
val conflictingAttributes = e.left.outputSet.intersect(e.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Except:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
case j: AsOfJoin if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in AsOfJoin:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)
// TODO: although map type is not orderable, technically map type should be able to be
// used in equality comparison, remove this type check once we support it.
case o if mapColumnInSetOperation(o).isDefined =>
val mapCol = mapColumnInSetOperation(o).get
failAnalysis("Cannot have map type columns in DataFrame which calls " +
s"set operations(intersect, except, etc.), but the type of column ${mapCol.name} " +
"is " + mapCol.dataType.catalogString)
case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
// Lateral join is checked in checkSubqueryExpression.
!o.isInstanceOf[LateralJoin] =>
// The rule above is used to check Aggregate operator.
failAnalysis(
s"""nondeterministic expressions are only allowed in
|Project, Filter, Aggregate or Window, found:
| ${o.expressions.map(_.sql).mkString(",")}
|in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}
""".stripMargin)
case _: UnresolvedHint => throw new IllegalStateException(
"Logical hint operator should be removed during analysis.")
case f @ Filter(condition, _)
if PlanHelper.specialExpressionsInUnsupportedOperator(f).nonEmpty =>
val invalidExprSqls = PlanHelper.specialExpressionsInUnsupportedOperator(f).map(_.sql)
failAnalysis(
s"""
|Aggregate/Window/Generate expressions are not valid in where clause of the query.
|Expression in where clause: [${condition.sql}]
|Invalid expressions: [${invalidExprSqls.mkString(", ")}]""".stripMargin)
case other if PlanHelper.specialExpressionsInUnsupportedOperator(other).nonEmpty =>
val invalidExprSqls =
PlanHelper.specialExpressionsInUnsupportedOperator(other).map(_.sql)
failAnalysis(
s"""
|The query operator `${other.nodeName}` contains one or more unsupported
|expression types Aggregate, Window or Generate.
|Invalid expressions: [${invalidExprSqls.mkString(", ")}]""".stripMargin
)
case _ => // Analysis successful!
}
}
checkCollectedMetrics(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
case _ =>
}
plan.setAnalyzed()
}
private def getAllExpressions(plan: LogicalPlan): Seq[Expression] = {
plan match {
// `groupingExpressions` may rely on `aggregateExpressions`, due to the GROUP BY alias
// feature. We should check errors in `aggregateExpressions` first.
case a: Aggregate => a.aggregateExpressions ++ a.groupingExpressions
case _ => plan.expressions
}
}
private def getDataTypesAreCompatibleFn(plan: LogicalPlan): (DataType, DataType) => Boolean = {
val isUnion = plan.isInstanceOf[Union]
if (isUnion) {
(dt1: DataType, dt2: DataType) =>
DataType.equalsStructurally(dt1, dt2, true)
} else {
// SPARK-18058: we shall not care about the nullability of columns
(dt1: DataType, dt2: DataType) =>
TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).nonEmpty
}
}
private def getDefaultTypeCoercionPlan(plan: LogicalPlan): LogicalPlan =
TypeCoercion.typeCoercionRules.foldLeft(plan) { case (p, rule) => rule(p) }
private def extraHintMessage(issueFixedIfAnsiOff: Boolean): String = {
if (issueFixedIfAnsiOff) {
"\nTo fix the error, you might need to add explicit type casts. If necessary set " +
s"${SQLConf.ANSI_ENABLED.key} to false to bypass this error."
} else {
""
}
}
private[analysis] def extraHintForAnsiTypeCoercionExpression(plan: LogicalPlan): String = {
if (!SQLConf.get.ansiEnabled) {
""
} else {
val nonAnsiPlan = getDefaultTypeCoercionPlan(plan)
var issueFixedIfAnsiOff = true
getAllExpressions(nonAnsiPlan).foreach(_.foreachUp {
case e: Expression if e.getTagValue(DATA_TYPE_MISMATCH_ERROR).contains(true) &&
e.checkInputDataTypes().isFailure =>
e.checkInputDataTypes() match {
case TypeCheckResult.TypeCheckFailure(_) | _: TypeCheckResult.DataTypeMismatch =>
issueFixedIfAnsiOff = false
}
case _ =>
})
extraHintMessage(issueFixedIfAnsiOff)
}
}
private def extraHintForAnsiTypeCoercionPlan(plan: LogicalPlan): String = {
if (!SQLConf.get.ansiEnabled) {
""
} else {
val nonAnsiPlan = getDefaultTypeCoercionPlan(plan)
var issueFixedIfAnsiOff = true
nonAnsiPlan match {
case _: Union | _: SetOperation if nonAnsiPlan.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
val ref = dataTypes(nonAnsiPlan.children.head)
val dataTypesAreCompatibleFn = getDataTypesAreCompatibleFn(nonAnsiPlan)
nonAnsiPlan.children.tail.zipWithIndex.foreach { case (child, ti) =>
// Check if the data types match.
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
if (!dataTypesAreCompatibleFn(dt1, dt2)) {
issueFixedIfAnsiOff = false
}
}
}
case _ =>
}
extraHintMessage(issueFixedIfAnsiOff)
}
}
private def scrubOutIds(string: String): String =
string.replaceAll("#\\d+", "#x")
.replaceAll("operator id = \\d+", "operator id = #x")
private def planToString(plan: LogicalPlan): String = {
if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString
}
private def exprsToString(exprs: Seq[Expression]): String = {
val result = exprs.map(_.toString).mkString("\n")
if (Utils.isTesting) scrubOutIds(result) else result
}
/**
* Validates subquery expressions in the plan. Upon failure, returns an user facing error.
*/
def checkSubqueryExpression(plan: LogicalPlan, expr: SubqueryExpression): Unit = {
def checkAggregateInScalarSubquery(
conditions: Seq[Expression],
query: LogicalPlan, agg: Aggregate): Unit = {
// Make sure correlated scalar subqueries contain one row for every outer row by
// enforcing that they are aggregates containing exactly one aggregate expression.
val aggregates = agg.expressions.flatMap(_.collect {
case a: AggregateExpression => a
})
if (aggregates.isEmpty) {
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT",
messageParameters = Map.empty)
}
// SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
// are not part of the correlated columns.
val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references))
// Collect the local references from the correlated predicate in the subquery.
val subqueryColumns = getCorrelatedPredicates(query).flatMap(_.references)
.filterNot(conditions.flatMap(_.references).contains)
val correlatedCols = AttributeSet(subqueryColumns)
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"NON_CORRELATED_COLUMNS_IN_GROUP_BY",
messageParameters = Map("value" -> invalidCols.map(_.name).mkString(",")))
}
}
// Skip subquery aliases added by the Analyzer.
// For projects, do the necessary mapping and skip to its child.
@scala.annotation.tailrec
def cleanQueryInScalarSubquery(p: LogicalPlan): LogicalPlan = p match {
case s: SubqueryAlias => cleanQueryInScalarSubquery(s.child)
case p: Project => cleanQueryInScalarSubquery(p.child)
case child => child
}
// Check whether the given expressions contains the subquery expression.
def containsExpr(expressions: Seq[Expression]): Boolean = {
expressions.exists(_.exists(_.semanticEquals(expr)))
}
def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = p match {
case f: Filter =>
if (hasOuterReferences(expr.plan)) {
expr.plan.expressions.foreach(_.foreachUp {
case o: OuterReference =>
p.children.foreach(e =>
if (!e.output.exists(_.exprId == o.exprId)) {
o.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"CORRELATED_COLUMN_NOT_FOUND",
messageParameters = Map("value" -> o.name))
})
case _ =>
})
}
case _ =>
}
// Validate the subquery plan.
checkAnalysis(expr.plan)
// Check if there is outer attribute that cannot be found from the plan.
checkOuterReference(plan, expr)
expr match {
case ScalarSubquery(query, outerAttrs, _, _) =>
// Scalar subquery must return one column as output.
if (query.output.size != 1) {
expr.failAnalysis(
errorClass = "INVALID_SUBQUERY_EXPRESSION." +
"SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN",
messageParameters = Map("number" -> query.output.size.toString))
}
if (outerAttrs.nonEmpty) {
cleanQueryInScalarSubquery(query) match {
case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, query, a)
case Filter(_, a: Aggregate) => checkAggregateInScalarSubquery(outerAttrs, query, a)
case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok
case other =>
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(other)))
}
// Only certain operators are allowed to host subquery expression containing
// outer references.
plan match {
case _: Filter | _: Project | _: SupportsSubquery => // Ok
case a: Aggregate =>
// If the correlated scalar subquery is in the grouping expressions of an Aggregate,
// it must also be in the aggregate expressions to be rewritten in the optimization
// phase.
if (containsExpr(a.groupingExpressions) && !containsExpr(a.aggregateExpressions)) {
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(a)))
}
case other =>
other.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(other)))
}
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
checkCorrelationsInSubquery(expr.plan, isScalar = true)
case _: LateralSubquery =>
assert(plan.isInstanceOf[LateralJoin])
val join = plan.asInstanceOf[LateralJoin]
// A lateral join with a multi-row outer query and a non-deterministic lateral subquery
// cannot be decorrelated. Otherwise it may produce incorrect results.
if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"NON_DETERMINISTIC_LATERAL_SUBQUERIES",
messageParameters = Map("treeNode" -> planToString(plan)))
}
// Check if the lateral join's join condition is deterministic.
if (join.condition.exists(!_.deterministic)) {
join.condition.get.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"LATERAL_JOIN_CONDITION_NON_DETERMINISTIC",
messageParameters = Map("condition" -> join.condition.get.sql))
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
checkCorrelationsInSubquery(expr.plan, isLateral = true)
case inSubqueryOrExistsSubquery =>
plan match {
case _: Filter | _: SupportsSubquery | _: Join |
_: Project | _: Aggregate | _: Window => // Ok
case _ =>
expr.failAnalysis(
errorClass =
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_IN_EXISTS_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(plan)))
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
checkCorrelationsInSubquery(expr.plan)
}
}
/**
* Validate that collected metrics names are unique. The same name cannot be used for metrics
* with different results. However multiple instances of metrics with with same result and name
* are allowed (e.g. self-joins).
*/
private def checkCollectedMetrics(plan: LogicalPlan): Unit = {
val metricsMap = mutable.Map.empty[String, LogicalPlan]
def check(plan: LogicalPlan): Unit = plan.foreach { node =>
node match {
case metrics @ CollectMetrics(name, _, _) =>
metricsMap.get(name) match {
case Some(other) =>
// Exact duplicates are allowed. They can be the result
// of a CTE that is used multiple times or a self join.
if (!metrics.sameResult(other)) {
failAnalysis(
s"Multiple definitions of observed metrics named '$name': $plan")
}
case None =>
metricsMap.put(name, metrics)
}
case _ =>
}
node.expressions.foreach(_.foreach {
case subquery: SubqueryExpression =>
check(subquery.plan)
case _ =>
})
}
check(plan)
}
/**
* Validates to make sure the outer references appearing inside the subquery
* are allowed.
*/
private def checkCorrelationsInSubquery(
sub: LogicalPlan,
isScalar: Boolean = false,
isLateral: Boolean = false): Unit = {
// Validate that correlated aggregate expression do not contain a mixture
// of outer and local references.
def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
if (a.references.nonEmpty) {
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
messageParameters = Map("function" -> a.sql))
}
case _ =>
}
}
// Make sure expressions of a plan do not contain outer references.
def failOnOuterReferenceInPlan(p: LogicalPlan): Unit = {
if (p.expressions.exists(containsOuter)) {
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
messageParameters = Map("treeNode" -> planToString(p)))
}
}
// Check whether the logical plan node can host outer references.
// A `Project` can host outer references if it is inside a scalar or a lateral subquery and
// DecorrelateInnerQuery is enabled. Otherwise, only Filter can only outer references.
def canHostOuter(plan: LogicalPlan): Boolean = plan match {
case _: Filter => true
case _: Project => (isScalar || isLateral) && SQLConf.get.decorrelateInnerQueryEnabled
case _ => false
}
// Make sure a plan's expressions do not contain :
// 1. Aggregate expressions that have mixture of outer and local references.
// 2. Expressions containing outer references on plan nodes other than allowed operators.
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!canHostOuter(p) && p.expressions.exists(containsOuter)) {
p.failAnalysis(
errorClass =
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE",
messageParameters = Map("treeNode" -> planToString(p)))
}
}
// SPARK-17348: A potential incorrect result case.
// When a correlated predicate is a non-equality predicate,
// certain operators are not permitted from the operator
// hosting the correlated predicate up to the operator on the outer table.
// Otherwise, the pull up of the correlated predicate
// will generate a plan with a different semantics
// which could return incorrect result.
// Currently we check for Aggregate and Window operators
//
// Below shows an example of a Logical Plan during Analyzer phase that
// show this problem. Pulling the correlated predicate [outer(c2#77) >= ..]
// through the Aggregate (or Window) operator could alter the result of
// the Aggregate.
//
// Project [c1#76]
// +- Project [c1#87, c2#88]
// : (Aggregate or Window operator)
// : +- Filter [outer(c2#77) >= c2#88)]
// : +- SubqueryAlias t2, `t2`
// : +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
// : +- LocalRelation [_1#84, _2#85]
// +- SubqueryAlias t1, `t1`
// +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
// +- LocalRelation [_1#73, _2#74]
// SPARK-35080: The same issue can happen to correlated equality predicates when
// they do not guarantee one-to-one mapping between inner and outer attributes.
// For example:
// Table:
// t1(a, b): [(0, 6), (1, 5), (2, 4)]
// t2(c): [(6)]
//
// Query:
// SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
//
// Original subquery plan:
// Aggregate [count(1)]
// +- Filter ((a + b) = outer(c))
// +- LocalRelation [a, b]
//
// Plan after pulling up correlated predicates:
// Aggregate [a, b] [count(1), a, b]
// +- LocalRelation [a, b]
//
// Plan after rewrite:
// Project [c1, count(1)]
// +- Join LeftOuter ((a + b) = c)
// :- LocalRelation [c]
// +- Aggregate [a, b] [count(1), a, b]
// +- LocalRelation [a, b]
//
// The right hand side of the join transformed from the subquery will output
// count(1) | a | b
// 1 | 0 | 6
// 1 | 1 | 5
// 1 | 2 | 4
// and the plan after rewrite will give the original query incorrect results.
def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
if (predicates.nonEmpty) {
// Report a non-supported case as an exception
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
"CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
messageParameters =
Map("treeNode" -> s"${exprsToString(predicates)}\n${planToString(p)}"))
}
}
// Recursively check invalid outer references in the plan.
def checkPlan(
plan: LogicalPlan,
aggregated: Boolean = false,
canContainOuter: Boolean = true): Unit = {
if (!canContainOuter) {
failOnOuterReferenceInPlan(plan)
}
// Approve operators allowed in a correlated subquery
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
// by definition of the operators, they either do not contain
// any columns or cannot host outer references.
// 2. Operators that are allowed anywhere in a correlated subquery
// so long as they do not host outer references.
// 3. Operators that need special handling. These operators are
// Filter, Join, Aggregate, and Generate.
//
// Any operators that are not in the above list are allowed
// in a correlated subquery only if they are not on a correlation path.
// In other word, these operators are allowed only under a correlation point.
//
// A correlation path is defined as the sub-tree of all the operators that
// are on the path from the operator hosting the correlated expressions
// up to the operator producing the correlated values.
plan match {
// Category 1:
// ResolvedHint, LeafNode, Repartition, and SubqueryAlias
case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: SubqueryAlias) =>
p.children.foreach(child => checkPlan(child, aggregated, canContainOuter))
// Category 2:
// These operators can be anywhere in a correlated subquery.
// so long as they do not host outer references in the operators.
case p: Project =>
failOnInvalidOuterReference(p)
checkPlan(p.child, aggregated, canContainOuter)
case s: Sort =>
failOnInvalidOuterReference(s)
checkPlan(s.child, aggregated, canContainOuter)
case r: RepartitionByExpression =>
failOnInvalidOuterReference(r)
checkPlan(r.child, aggregated, canContainOuter)
case l: LateralJoin =>
failOnInvalidOuterReference(l)
checkPlan(l.child, aggregated, canContainOuter)
// Category 3:
// Filter is one of the two operators allowed to host correlated expressions.
// The other operator is Join. Filter can be anywhere in a correlated subquery.
case f: Filter =>
failOnInvalidOuterReference(f)
val (correlated, _) = splitConjunctivePredicates(f.condition).partition(containsOuter)
val unsupportedPredicates = correlated.filterNot(DecorrelateInnerQuery.canPullUpOverAgg)
if (aggregated) {
failOnUnsupportedCorrelatedPredicate(unsupportedPredicates, f)
}
checkPlan(f.child, aggregated, canContainOuter)
// Aggregate cannot host any correlated expressions
// It can be on a correlation path if the correlation contains
// only supported correlated equality predicates.
// It cannot be on a correlation path if the correlation has
// non-equality correlated predicates.
case a: Aggregate =>
failOnInvalidOuterReference(a)
checkPlan(a.child, aggregated = true, canContainOuter)
// Distinct does not host any correlated expressions, but during the optimization phase
// it will be rewritten as Aggregate, which can only be on a correlation path if the
// correlation contains only the supported correlated equality predicates.
// Only block it for lateral subqueries because scalar subqueries must be aggregated
// and it does not impact the results for IN/EXISTS subqueries.
case d: Distinct =>
checkPlan(d.child, aggregated = isLateral, canContainOuter)
// Join can host correlated expressions.
case j @ Join(left, right, joinType, _, _) =>
failOnInvalidOuterReference(j)
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
j.children.foreach(child => checkPlan(child, aggregated, canContainOuter))
// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
// Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame
// so it should not show up here in Analysis phase. This is just a safety net.
//
// LeftSemi does not allow output from the right operand.
// Any correlated references in the subplan
// of the right operand cannot be pulled up.
case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
checkPlan(left, aggregated, canContainOuter)
checkPlan(right, aggregated, canContainOuter = false)
// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
checkPlan(left, aggregated, canContainOuter = false)
checkPlan(right, aggregated, canContainOuter)
// Any other join types not explicitly listed above,
// including Full outer join, are treated as Category 4.
case _ =>
j.children.foreach(child => checkPlan(child, aggregated, canContainOuter = false))
}
// Generator with join=true, i.e., expressed with
// LATERAL VIEW [OUTER], similar to inner join,
// allows to have correlation under it
// but must not host any outer references.
// Note:
// Generator with requiredChildOutput.isEmpty is treated as Category 4.
case g: Generate if g.requiredChildOutput.nonEmpty =>
failOnInvalidOuterReference(g)
checkPlan(g.child, aggregated, canContainOuter)
// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed only
// under a correlation point but they and their descendant operators
// are not allowed to have any correlated expressions.
case p =>
p.children.foreach(p => checkPlan(p, aggregated, canContainOuter = false))
}
}
// Simplify the predicates before validating any unsupported correlation patterns in the plan.
AnalysisHelper.allowInvokingTransformsInAnalyzer {
checkPlan(BooleanSimplification(sub))
}
}
/**
* Validates the options used for alter table commands after table and columns are resolved.
*/
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = {
if (struct.findNestedField(
fieldNames, includeCollections = true, alter.conf.resolver).isDefined) {
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " +
s"already exists in ${struct.treeString}")
}
}
def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = {
SchemaUtils.checkColumnNameDuplication(
colsToAdd.map(_.name.quoted),
"in the user specified columns",
alter.conf.resolver)
}
alter match {
case AddColumns(table: ResolvedTable, colsToAdd) =>
colsToAdd.foreach { colToAdd =>
checkColumnNotExists("add", colToAdd.name, table.schema)
}
checkColumnNameDuplication(colsToAdd)
case ReplaceColumns(_: ResolvedTable, colsToAdd) =>
checkColumnNameDuplication(colsToAdd)
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists("rename", col.path :+ newName, table.schema)
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
newDataType match {
case _: StructType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
"update a struct by updating its fields")
case _: MapType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case u: UserDefinedType[_] =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a UserDefinedType[${u.sql}] by updating its fields")
case _: CalendarIntervalType | _: AnsiIntervalType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to interval type")
case _ => // update is okay
}
// We don't need to handle nested types here which shall fail before.
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to ${newDataType.simpleString}")
}
}
if (a.nullable.isDefined) {
if (!a.nullable.get && col.field.nullable) {
alter.failAnalysis(s"Cannot change nullable column to non-nullable: $fieldName")
}
}
case _ =>
}
}
}
相关信息
相关文章
spark AlreadyExistException 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦