spark v2Commands 源码
spark v2Commands 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.write.{RowLevelOperation, RowLevelOperationTable, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType}
/**
* Base trait for DataSourceV2 write commands
*/
trait V2WriteCommand extends UnaryCommand {
def table: NamedRelation
def query: LogicalPlan
def isByName: Boolean
override def child: LogicalPlan = query
override lazy val resolved: Boolean = table.resolved && query.resolved && outputResolved
def outputResolved: Boolean = {
assert(table.resolved && query.resolved,
"`outputResolved` can only be called when `table` and `query` are both resolved.")
// If the table doesn't require schema match, we don't need to resolve the output columns.
table.skipSchemaResolution || (query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
(outAttr.nullable || !inAttr.nullable)
})
}
def withNewQuery(newQuery: LogicalPlan): V2WriteCommand
def withNewTable(newTable: NamedRelation): V2WriteCommand
}
trait V2PartitionCommand extends UnaryCommand {
def table: LogicalPlan
def allowPartialPartitionSpec: Boolean = false
override def child: LogicalPlan = table
}
/**
* Append data to an existing table.
*/
case class AppendData(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
override def withNewQuery(newQuery: LogicalPlan): AppendData = copy(query = newQuery)
override def withNewTable(newTable: NamedRelation): AppendData = copy(table = newTable)
override protected def withNewChildInternal(newChild: LogicalPlan): AppendData =
copy(query = newChild)
}
object AppendData {
def byName(
table: NamedRelation,
df: LogicalPlan,
writeOptions: Map[String, String] = Map.empty): AppendData = {
new AppendData(table, df, writeOptions, isByName = true)
}
def byPosition(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String] = Map.empty): AppendData = {
new AppendData(table, query, writeOptions, isByName = false)
}
}
/**
* Overwrite data matching a filter in an existing table.
*/
case class OverwriteByExpression(
table: NamedRelation,
deleteExpr: Expression,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
override lazy val resolved: Boolean = {
table.resolved && query.resolved && outputResolved && deleteExpr.resolved
}
override def inputSet: AttributeSet = AttributeSet(table.output)
override def withNewQuery(newQuery: LogicalPlan): OverwriteByExpression = {
copy(query = newQuery)
}
override def withNewTable(newTable: NamedRelation): OverwriteByExpression = {
copy(table = newTable)
}
override protected def withNewChildInternal(newChild: LogicalPlan): OverwriteByExpression =
copy(query = newChild)
}
object OverwriteByExpression {
def byName(
table: NamedRelation,
df: LogicalPlan,
deleteExpr: Expression,
writeOptions: Map[String, String] = Map.empty): OverwriteByExpression = {
OverwriteByExpression(table, deleteExpr, df, writeOptions, isByName = true)
}
def byPosition(
table: NamedRelation,
query: LogicalPlan,
deleteExpr: Expression,
writeOptions: Map[String, String] = Map.empty): OverwriteByExpression = {
OverwriteByExpression(table, deleteExpr, query, writeOptions, isByName = false)
}
}
/**
* Dynamically overwrite partitions in an existing table.
*/
case class OverwritePartitionsDynamic(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String],
isByName: Boolean,
write: Option[Write] = None) extends V2WriteCommand {
override def withNewQuery(newQuery: LogicalPlan): OverwritePartitionsDynamic = {
copy(query = newQuery)
}
override def withNewTable(newTable: NamedRelation): OverwritePartitionsDynamic = {
copy(table = newTable)
}
override protected def withNewChildInternal(newChild: LogicalPlan): OverwritePartitionsDynamic =
copy(query = newChild)
}
object OverwritePartitionsDynamic {
def byName(
table: NamedRelation,
df: LogicalPlan,
writeOptions: Map[String, String] = Map.empty): OverwritePartitionsDynamic = {
OverwritePartitionsDynamic(table, df, writeOptions, isByName = true)
}
def byPosition(
table: NamedRelation,
query: LogicalPlan,
writeOptions: Map[String, String] = Map.empty): OverwritePartitionsDynamic = {
OverwritePartitionsDynamic(table, query, writeOptions, isByName = false)
}
}
trait RowLevelWrite extends V2WriteCommand with SupportsSubquery {
def operation: RowLevelOperation
def condition: Expression
def originalTable: NamedRelation
}
/**
* Replace groups of data in an existing table during a row-level operation.
*
* This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
* that can replace groups of data (e.g. files, partitions).
*
* @param table a plan that references a row-level operation table
* @param condition a condition that defines matching groups
* @param query a query with records that should replace the records that were read
* @param originalTable a plan for the original table for which the row-level command was triggered
* @param write a logical write, if already constructed
*/
case class ReplaceData(
table: NamedRelation,
condition: Expression,
query: LogicalPlan,
originalTable: NamedRelation,
write: Option[Write] = None) extends RowLevelWrite {
override val isByName: Boolean = false
override val stringArgs: Iterator[Any] = Iterator(table, query, write)
override lazy val references: AttributeSet = query.outputSet
lazy val operation: RowLevelOperation = {
EliminateSubqueryAliases(table) match {
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
operation
case _ =>
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
}
}
// the incoming query may include metadata columns
lazy val dataInput: Seq[Attribute] = {
query.output.filter {
case MetadataAttribute(_) => false
case _ => true
}
}
override def outputResolved: Boolean = {
assert(table.resolved && query.resolved,
"`outputResolved` can only be called when `table` and `query` are both resolved.")
// take into account only incoming data columns and ignore metadata columns in the query
// they will be discarded after the logical write is built in the optimizer
// metadata columns may be needed to request a correct distribution or ordering
// but are not passed back to the data source during writes
table.skipSchemaResolution || (dataInput.size == table.output.size &&
dataInput.zip(table.output).forall { case (inAttr, outAttr) =>
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
(outAttr.nullable || !inAttr.nullable)
})
}
override def withNewQuery(newQuery: LogicalPlan): ReplaceData = copy(query = newQuery)
override def withNewTable(newTable: NamedRelation): ReplaceData = copy(table = newTable)
override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceData = {
copy(query = newChild)
}
}
/** A trait used for logical plan nodes that create or replace V2 table definitions. */
trait V2CreateTablePlan extends LogicalPlan {
def tableName: Identifier
def partitioning: Seq[Transform]
def tableSchema: StructType
/**
* Creates a copy of this node with the new partitioning transforms. This method is used to
* rewrite the partition transforms normalized according to the table schema.
*/
def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan
}
/**
* Create a new table with a v2 catalog.
*/
case class CreateTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
tableSpec: TableSpec,
ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
override def tableName: Identifier = {
assert(child.resolved)
child.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
copy(name = newChild)
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
}
/**
* Create a new table from a select query with a v2 catalog.
*/
case class CreateTableAsSelect(
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan {
override def tableSchema: StructType = query.schema
override def left: LogicalPlan = name
override def right: LogicalPlan = query
override def tableName: Identifier = {
assert(left.resolved)
left.asInstanceOf[ResolvedIdentifier].identifier
}
override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
val references = partitioning.flatMap(_.references).toSet
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
}
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
override protected def withNewChildrenInternal(
newLeft: LogicalPlan,
newRight: LogicalPlan
): CreateTableAsSelect =
copy(name = newLeft, query = newRight)
}
/**
* Replace a table with a v2 catalog.
*
* If the table does not exist, and orCreate is true, then it will be created.
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*
* The persisted table will have no contents as a result of this operation.
*/
case class ReplaceTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
tableSpec: TableSpec,
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
override def tableName: Identifier = {
assert(child.resolved)
child.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
copy(name = newChild)
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
}
/**
* Replaces a table from a select query with a v2 catalog.
*
* If the table does not exist, and orCreate is true, then it will be created.
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*/
case class ReplaceTableAsSelect(
name: LogicalPlan,
partitioning: Seq[Transform],
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
override def tableSchema: StructType = query.schema
override def left: LogicalPlan = name
override def right: LogicalPlan = query
override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
val references = partitioning.flatMap(_.references).toSet
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
}
override def tableName: Identifier = {
assert(name.resolved)
name.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildrenInternal(
newLeft: LogicalPlan,
newRight: LogicalPlan): LogicalPlan =
copy(name = newLeft, query = newRight)
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
}
/**
* The logical plan of the CREATE NAMESPACE command.
*/
case class CreateNamespace(
name: LogicalPlan,
ifNotExists: Boolean,
properties: Map[String, String]) extends UnaryCommand {
override def child: LogicalPlan = name
override protected def withNewChildInternal(newChild: LogicalPlan): CreateNamespace =
copy(name = newChild)
}
/**
* The logical plan of the DROP NAMESPACE command.
*/
case class DropNamespace(
namespace: LogicalPlan,
ifExists: Boolean,
cascade: Boolean) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(namespace = newChild)
}
/**
* The logical plan of the DESCRIBE NAMESPACE command.
*/
case class DescribeNamespace(
namespace: LogicalPlan,
extended: Boolean,
override val output: Seq[Attribute] = DescribeNamespace.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeNamespace =
copy(namespace = newChild)
}
object DescribeNamespace {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("info_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the namespace info").build())(),
AttributeReference("info_value", StringType, nullable = true,
new MetadataBuilder().putString("comment", "value of the namespace info").build())())
}
/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET (DBPROPERTIES|PROPERTIES)
* command.
*/
case class SetNamespaceProperties(
namespace: LogicalPlan,
properties: Map[String, String]) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceProperties =
copy(namespace = newChild)
}
/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command.
*/
case class SetNamespaceLocation(
namespace: LogicalPlan,
location: String) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceLocation =
copy(namespace = newChild)
}
/**
* The logical plan of the SHOW NAMESPACES command.
*/
case class ShowNamespaces(
namespace: LogicalPlan,
pattern: Option[String],
override val output: Seq[Attribute] = ShowNamespaces.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): ShowNamespaces =
copy(namespace = newChild)
}
object ShowNamespaces {
def getOutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("namespace", StringType, nullable = false)())
}
}
/**
* The logical plan of the DESCRIBE relation_name command.
*/
case class DescribeRelation(
relation: LogicalPlan,
partitionSpec: TablePartitionSpec,
isExtended: Boolean,
override val output: Seq[Attribute] = DescribeRelation.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = relation
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeRelation =
copy(relation = newChild)
}
object DescribeRelation {
def getOutputAttrs: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes()
}
/**
* The logical plan of the DESCRIBE relation_name col_name command.
*/
case class DescribeColumn(
relation: LogicalPlan,
column: Expression,
isExtended: Boolean,
override val output: Seq[Attribute] = DescribeColumn.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = relation
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeColumn =
copy(relation = newChild)
}
object DescribeColumn {
def getOutputAttrs: Seq[Attribute] = DescribeCommandSchema.describeColumnAttributes()
}
/**
* The logical plan of the DELETE FROM command.
*/
case class DeleteFromTable(
table: LogicalPlan,
condition: Expression) extends UnaryCommand with SupportsSubquery {
override def child: LogicalPlan = table
override protected def withNewChildInternal(newChild: LogicalPlan): DeleteFromTable =
copy(table = newChild)
}
/**
* The logical plan of the DELETE FROM command that can be executed using data source filters.
*
* As opposed to [[DeleteFromTable]], this node represents a DELETE operation where the condition
* was converted into filters and the data source reported that it can handle all of them.
*/
case class DeleteFromTableWithFilters(
table: LogicalPlan,
condition: Seq[Predicate]) extends LeafCommand
/**
* The logical plan of the UPDATE TABLE command.
*/
case class UpdateTable(
table: LogicalPlan,
assignments: Seq[Assignment],
condition: Option[Expression]) extends UnaryCommand with SupportsSubquery {
override def child: LogicalPlan = table
override protected def withNewChildInternal(newChild: LogicalPlan): UpdateTable =
copy(table = newChild)
def skipSchemaResolution: Boolean = table match {
case r: NamedRelation => r.skipSchemaResolution
case SubqueryAlias(_, r: NamedRelation) => r.skipSchemaResolution
case _ => false
}
}
/**
* The logical plan of the MERGE INTO command.
*/
case class MergeIntoTable(
targetTable: LogicalPlan,
sourceTable: LogicalPlan,
mergeCondition: Expression,
matchedActions: Seq[MergeAction],
notMatchedActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery {
def duplicateResolved: Boolean = targetTable.outputSet.intersect(sourceTable.outputSet).isEmpty
def skipSchemaResolution: Boolean = targetTable match {
case r: NamedRelation => r.skipSchemaResolution
case SubqueryAlias(_, r: NamedRelation) => r.skipSchemaResolution
case _ => false
}
override def left: LogicalPlan = targetTable
override def right: LogicalPlan = sourceTable
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
copy(targetTable = newLeft, sourceTable = newRight)
}
sealed abstract class MergeAction extends Expression with Unevaluable {
def condition: Option[Expression]
override def nullable: Boolean = false
override def dataType: DataType = throw new UnresolvedException("nullable")
override def children: Seq[Expression] = condition.toSeq
}
case class DeleteAction(condition: Option[Expression]) extends MergeAction {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): DeleteAction =
copy(condition = if (condition.isDefined) Some(newChildren(0)) else None)
}
case class UpdateAction(
condition: Option[Expression],
assignments: Seq[Assignment]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq ++ assignments
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): UpdateAction =
copy(
condition = if (condition.isDefined) Some(newChildren.head) else None,
assignments = newChildren.takeRight(assignments.length).asInstanceOf[Seq[Assignment]])
}
case class UpdateStarAction(condition: Option[Expression]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq
override lazy val resolved = false
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): UpdateStarAction =
copy(condition = if (condition.isDefined) Some(newChildren(0)) else None)
}
case class InsertAction(
condition: Option[Expression],
assignments: Seq[Assignment]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq ++ assignments
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): InsertAction =
copy(
condition = if (condition.isDefined) Some(newChildren.head) else None,
assignments = newChildren.takeRight(assignments.length).asInstanceOf[Seq[Assignment]])
}
case class InsertStarAction(condition: Option[Expression]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq
override lazy val resolved = false
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): InsertStarAction =
copy(condition = if (condition.isDefined) Some(newChildren(0)) else None)
}
case class Assignment(key: Expression, value: Expression) extends Expression
with Unevaluable with BinaryLike[Expression] {
override def nullable: Boolean = false
override def dataType: DataType = throw new UnresolvedException("nullable")
override def left: Expression = key
override def right: Expression = value
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Assignment = copy(key = newLeft, value = newRight)
}
/**
* The logical plan of the DROP TABLE command.
*
* If the `PURGE` option is set, the table catalog must remove table data by skipping the trash
* even when the catalog has configured one. The option is applicable only for managed tables.
*
* The syntax of this command is:
* {{{
* DROP TABLE [IF EXISTS] table [PURGE];
* }}}
*/
case class DropTable(
child: LogicalPlan,
ifExists: Boolean,
purge: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DropTable =
copy(child = newChild)
}
/**
* The logical plan for no-op command handling non-existing table.
*/
case class NoopCommand(
commandName: String,
multipartIdentifier: Seq[String]) extends LeafCommand
/**
* The logical plan of the ALTER [TABLE|VIEW] ... RENAME TO command.
*/
case class RenameTable(
child: LogicalPlan,
newName: Seq[String],
isView: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RenameTable =
copy(child = newChild)
}
/**
* The logical plan of the SHOW TABLES command.
*/
case class ShowTables(
namespace: LogicalPlan,
pattern: Option[String],
override val output: Seq[Attribute] = ShowTables.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): ShowTables =
copy(namespace = newChild)
}
object ShowTables {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)(),
AttributeReference("tableName", StringType, nullable = false)(),
AttributeReference("isTemporary", BooleanType, nullable = false)())
}
/**
* The logical plan of the SHOW TABLE EXTENDED command.
*/
case class ShowTableExtended(
namespace: LogicalPlan,
pattern: String,
partitionSpec: Option[PartitionSpec],
override val output: Seq[Attribute] = ShowTableExtended.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): ShowTableExtended =
copy(namespace = newChild)
}
object ShowTableExtended {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)(),
AttributeReference("tableName", StringType, nullable = false)(),
AttributeReference("isTemporary", BooleanType, nullable = false)(),
AttributeReference("information", StringType, nullable = false)())
}
/**
* The logical plan of the SHOW VIEWS command.
*
* Notes: v2 catalogs do not support views API yet, the command will fallback to
* v1 ShowViewsCommand during ResolveSessionCatalog.
*/
case class ShowViews(
namespace: LogicalPlan,
pattern: Option[String],
override val output: Seq[Attribute] = ShowViews.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): ShowViews =
copy(namespace = newChild)
}
object ShowViews {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("namespace", StringType, nullable = false)(),
AttributeReference("viewName", StringType, nullable = false)(),
AttributeReference("isTemporary", BooleanType, nullable = false)())
}
/**
* The logical plan of the USE command.
*/
case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): SetCatalogAndNamespace = {
copy(child = newChild)
}
}
/**
* The logical plan of the REFRESH TABLE command.
*/
case class RefreshTable(child: LogicalPlan) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RefreshTable =
copy(child = newChild)
}
/**
* The logical plan of the SHOW TBLPROPERTIES command.
*/
case class ShowTableProperties(
table: LogicalPlan,
propertyKey: Option[String],
override val output: Seq[Attribute] = ShowTableProperties.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = table
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
object ShowTableProperties {
def getOutputAttrs: Seq[Attribute] = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)())
}
/**
* The logical plan that defines or changes the comment of an NAMESPACE for v2 catalogs.
*
* {{{
* COMMENT ON (DATABASE|SCHEMA|NAMESPACE) namespaceIdentifier IS ('text' | NULL)
* }}}
*
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
*
*/
case class CommentOnNamespace(child: LogicalPlan, comment: String) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): CommentOnNamespace =
copy(child = newChild)
}
/**
* The logical plan of the REFRESH FUNCTION command.
*/
case class RefreshFunction(child: LogicalPlan) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RefreshFunction =
copy(child = newChild)
}
/**
* The logical plan of the DESCRIBE FUNCTION command.
*/
case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DescribeFunction =
copy(child = newChild)
}
/**
* The logical plan of the CREATE FUNCTION command.
*/
case class CreateFunction(
child: LogicalPlan,
className: String,
resources: Seq[FunctionResource],
ifExists: Boolean,
replace: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): CreateFunction =
copy(child = newChild)
}
/**
* The logical plan of the DROP FUNCTION command.
*/
case class DropFunction(
child: LogicalPlan,
ifExists: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DropFunction =
copy(child = newChild)
}
/**
* The logical plan of the SHOW FUNCTIONS command.
*/
case class ShowFunctions(
namespace: LogicalPlan,
userScope: Boolean,
systemScope: Boolean,
pattern: Option[String],
override val output: Seq[Attribute] = ShowFunctions.getOutputAttrs) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): ShowFunctions =
copy(namespace = newChild)
}
object ShowFunctions {
def getOutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("function", StringType, nullable = false)())
}
}
/**
* The logical plan of the ANALYZE TABLE command.
*/
case class AnalyzeTable(
child: LogicalPlan,
partitionSpec: Map[String, Option[String]],
noScan: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): AnalyzeTable =
copy(child = newChild)
}
/**
* The logical plan of the ANALYZE TABLES command.
*/
case class AnalyzeTables(
namespace: LogicalPlan,
noScan: Boolean) extends UnaryCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): AnalyzeTables =
copy(namespace = newChild)
}
/**
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
*/
case class AnalyzeColumn(
child: LogicalPlan,
columnNames: Option[Seq[String]],
allColumns: Boolean) extends UnaryCommand {
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
override protected def withNewChildInternal(newChild: LogicalPlan): AnalyzeColumn =
copy(child = newChild)
}
/**
* The logical plan of the ALTER TABLE ADD PARTITION command.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table ADD [IF NOT EXISTS]
* PARTITION spec1 [LOCATION 'loc1'][, PARTITION spec2 [LOCATION 'loc2'], ...];
* }}}
*/
case class AddPartitions(
table: LogicalPlan,
parts: Seq[PartitionSpec],
ifNotExists: Boolean) extends V2PartitionCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): AddPartitions =
copy(table = newChild)
}
/**
* The logical plan of the ALTER TABLE DROP PARTITION command.
* This may remove the data and metadata for this partition.
*
* If the `PURGE` option is set, the table catalog must remove partition data by skipping the trash
* even when the catalog has configured one. The option is applicable only for managed tables.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
case class DropPartitions(
table: LogicalPlan,
parts: Seq[PartitionSpec],
ifExists: Boolean,
purge: Boolean) extends V2PartitionCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DropPartitions =
copy(table = newChild)
}
/**
* The logical plan of the ALTER TABLE ... RENAME TO PARTITION command.
*/
case class RenamePartitions(
table: LogicalPlan,
from: PartitionSpec,
to: PartitionSpec) extends V2PartitionCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RenamePartitions =
copy(table = newChild)
}
/**
* The logical plan of the ALTER TABLE ... RECOVER PARTITIONS command.
*/
case class RecoverPartitions(child: LogicalPlan) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RecoverPartitions =
copy(child = newChild)
}
/**
* The logical plan of the LOAD DATA INTO TABLE command.
*/
case class LoadData(
child: LogicalPlan,
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): LoadData =
copy(child = newChild)
}
/**
* The logical plan of the SHOW CREATE TABLE command.
*/
case class ShowCreateTable(
child: LogicalPlan,
asSerde: Boolean = false,
override val output: Seq[Attribute] = ShowCreateTable.getoutputAttrs) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): ShowCreateTable =
copy(child = newChild)
}
object ShowCreateTable {
def getoutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("createtab_stmt", StringType, nullable = false)())
}
}
/**
* The logical plan of the SHOW COLUMN command.
*/
case class ShowColumns(
child: LogicalPlan,
namespace: Option[Seq[String]],
override val output: Seq[Attribute] = ShowColumns.getOutputAttrs) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): ShowColumns =
copy(child = newChild)
}
object ShowColumns {
def getOutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("col_name", StringType, nullable = false)())
}
}
/**
* The logical plan of the TRUNCATE TABLE command.
*/
case class TruncateTable(table: LogicalPlan) extends UnaryCommand {
override def child: LogicalPlan = table
override protected def withNewChildInternal(newChild: LogicalPlan): TruncateTable =
copy(table = newChild)
}
/**
* The logical plan of the TRUNCATE TABLE ... PARTITION command.
*/
case class TruncatePartition(
table: LogicalPlan,
partitionSpec: PartitionSpec) extends V2PartitionCommand {
override def allowPartialPartitionSpec: Boolean = true
override protected def withNewChildInternal(newChild: LogicalPlan): TruncatePartition =
copy(table = newChild)
}
/**
* The logical plan of the SHOW PARTITIONS command.
*/
case class ShowPartitions(
table: LogicalPlan,
pattern: Option[PartitionSpec],
override val output: Seq[Attribute] = ShowPartitions.getOutputAttrs)
extends V2PartitionCommand {
override def allowPartialPartitionSpec: Boolean = true
override protected def withNewChildInternal(newChild: LogicalPlan): ShowPartitions =
copy(table = newChild)
}
object ShowPartitions {
def getOutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("partition", StringType, nullable = false)())
}
}
/**
* The logical plan of the DROP VIEW command.
*/
case class DropView(
child: LogicalPlan,
ifExists: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): DropView =
copy(child = newChild)
}
/**
* The logical plan of the MSCK REPAIR TABLE command.
*/
case class RepairTable(
child: LogicalPlan,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): RepairTable =
copy(child = newChild)
}
/**
* The logical plan of the ALTER VIEW ... AS command.
*/
case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends BinaryCommand {
override def left: LogicalPlan = child
override def right: LogicalPlan = query
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
/**
* The logical plan of the CREATE VIEW ... command.
*/
case class CreateView(
child: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
query: LogicalPlan,
allowExisting: Boolean,
replace: Boolean) extends BinaryCommand {
override def left: LogicalPlan = child
override def right: LogicalPlan = query
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
/**
* The logical plan of the ALTER VIEW ... SET TBLPROPERTIES command.
*/
case class SetViewProperties(
child: LogicalPlan,
properties: Map[String, String]) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): SetViewProperties =
copy(child = newChild)
}
/**
* The logical plan of the ALTER VIEW ... UNSET TBLPROPERTIES command.
*/
case class UnsetViewProperties(
child: LogicalPlan,
propertyKeys: Seq[String],
ifExists: Boolean) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): UnsetViewProperties =
copy(child = newChild)
}
/**
* The logical plan of the ALTER TABLE ... SET [SERDE|SERDEPROPERTIES] command.
*/
case class SetTableSerDeProperties(
child: LogicalPlan,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
partitionSpec: Option[TablePartitionSpec]) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): SetTableSerDeProperties =
copy(child = newChild)
}
/**
* The logical plan of the CACHE TABLE command.
*/
case class CacheTable(
table: LogicalPlan,
multipartIdentifier: Seq[String],
isLazy: Boolean,
options: Map[String, String],
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}
override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
}
/**
* The logical plan of the CACHE TABLE ... AS SELECT command.
*/
case class CacheTableAsSelect(
tempViewName: String,
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
options: Map[String, String],
isAnalyzed: Boolean = false,
referredTempFunctions: Seq[String] = Seq.empty) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
assert(!isAnalyzed)
copy(plan = newChildren.head)
}
override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = {
copy(
isAnalyzed = true,
// Collect the referred temporary functions from AnalysisContext
referredTempFunctions = ac.referredTempFunctionNames.toSeq)
}
}
/**
* The logical plan of the UNCACHE TABLE command.
*/
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): UncacheTable = {
assert(!isAnalyzed)
copy(table = newChildren.head)
}
override def childrenToAnalyze: Seq[LogicalPlan] = table :: Nil
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true)
}
/**
* The logical plan of the CREATE INDEX command.
*/
case class CreateIndex(
table: LogicalPlan,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(FieldName, Map[String, String])],
properties: Map[String, String]) extends UnaryCommand {
override def child: LogicalPlan = table
override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved)
override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex =
copy(table = newChild)
}
/**
* The logical plan of the DROP INDEX command.
*/
case class DropIndex(
table: LogicalPlan,
indexName: String,
ignoreIfNotExists: Boolean) extends UnaryCommand {
override def child: LogicalPlan = table
override protected def withNewChildInternal(newChild: LogicalPlan): DropIndex =
copy(table = newChild)
}
case class TableSpec(
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
external: Boolean)
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦