spark interface 源码
spark interface 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.catalog
import java.net.URI
import java.time.{ZoneId, ZoneOffset}
import java.util.Date
import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A function defined in the catalog.
*
* @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
* @param resources resource types and Uris used by the function
*/
case class CatalogFunction(
identifier: FunctionIdentifier,
className: String,
resources: Seq[FunctionResource])
/**
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String]) {
override def toString: String = {
toLinkedHashMap.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("Storage(", ", ", ")")
}
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
locationUri.foreach(l => map.put("Location", l.toString))
serde.foreach(map.put("Serde Library", _))
inputFormat.foreach(map.put("InputFormat", _))
outputFormat.foreach(map.put("OutputFormat", _))
if (compressed) map.put("Compressed", "")
SQLConf.get.redactOptions(properties) match {
case props if props.isEmpty => // No-op
case props =>
map.put("Storage Properties", props.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]"))
}
map
}
}
object CatalogStorageFormat {
/** Empty storage format for default values and copies. */
val empty = CatalogStorageFormat(locationUri = None, inputFormat = None,
outputFormat = None, serde = None, compressed = false, properties = Map.empty)
}
/**
* A partition (Hive style) defined in the catalog.
*
* @param spec partition spec values indexed by column name
* @param storage storage format of the partition
* @param parameters some parameters for the partition
* @param createTime creation time of the partition, in milliseconds
* @param lastAccessTime last access time, in milliseconds
* @param stats optional statistics (number of rows, total size, etc.)
*/
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty,
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
stats: Option[CatalogStatistics] = None) {
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
map.put("Partition Values", s"[$specString]")
map ++= storage.toLinkedHashMap
if (parameters.nonEmpty) {
map.put("Partition Parameters", s"{" +
s"${SQLConf.get.redactOptions(parameters).map(p => p._1 + "=" + p._2).mkString(", ")}}")
}
map.put("Created Time", new Date(createTime).toString)
val lastAccess = {
if (lastAccessTime <= 0) "UNKNOWN" else new Date(lastAccessTime).toString
}
map.put("Last Access", lastAccess)
stats.foreach(s => map.put("Partition Statistics", s.simpleString))
map
}
override def toString: String = {
toLinkedHashMap.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("CatalogPartition(\n\t", "\n\t", ")")
}
/** Readable string representation for the CatalogTablePartition. */
def simpleString: String = {
toLinkedHashMap.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "")
}
/** Return the partition location, assuming it is specified. */
def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw QueryCompilationErrors.partitionNotSpecifyLocationUriError(specString)
}
/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
val timeZoneId = caseInsensitiveProperties.getOrElse(
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
InternalRow.fromSeq(partitionSchema.map { field =>
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
spec(field.name)
}
Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
})
}
}
/**
* A container for bucketing information.
* Bucketing is a technology for decomposing data sets into more manageable parts, and the number
* of buckets is fixed so it does not fluctuate with data.
*
* @param numBuckets number of buckets.
* @param bucketColumnNames the names of the columns that used to generate the bucket id.
* @param sortColumnNames the names of the columns that used to sort data in each bucket.
*/
case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) extends SQLConfHelper {
if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
throw QueryCompilationErrors.invalidBucketNumberError(
conf.bucketingMaxBuckets, numBuckets)
}
override def toString: String = {
val bucketString = s"bucket columns: [${bucketColumnNames.mkString(", ")}]"
val sortString = if (sortColumnNames.nonEmpty) {
s", sort columns: [${sortColumnNames.mkString(", ")}]"
} else {
""
}
s"$numBuckets buckets, $bucketString$sortString"
}
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
mutable.LinkedHashMap[String, String](
"Num Buckets" -> numBuckets.toString,
"Bucket Columns" -> bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]"),
"Sort Columns" -> sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
)
}
}
/**
* A table defined in the catalog.
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
* @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
* @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
* configured.
* @param ignoredProperties is a list of table properties that are used by the underlying table
* but ignored by Spark SQL yet.
* @param createVersion records the version of Spark that created this table metadata. The default
* is an empty string. We expect it will be read from the catalog or filled by
* ExternalCatalog.createTable. For temporary views, the value will be empty.
*/
case class CatalogTable(
identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
createVersion: String = "",
properties: Map[String, String] = Map.empty,
stats: Option[CatalogStatistics] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
ignoredProperties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None) {
import CatalogTable._
/**
* schema of this table's partition columns
*/
def partitionSchema: StructType = {
val partitionFields = schema.takeRight(partitionColumnNames.length)
assert(partitionFields.map(_.name) == partitionColumnNames)
StructType(partitionFields)
}
/**
* schema of this table's data columns
*/
def dataSchema: StructType = {
val dataFields = schema.dropRight(partitionColumnNames.length)
StructType(dataFields)
}
/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
throw QueryCompilationErrors.tableNotSpecifyDatabaseError(identifier)
}
/** Return the table location, assuming it is specified. */
def location: URI = storage.locationUri.getOrElse {
throw QueryCompilationErrors.tableNotSpecifyLocationUriError(identifier)
}
/** Return the fully qualified name of this table, assuming the database was specified. */
def qualifiedName: String = identifier.unquotedString
/**
* Return the current catalog and namespace (concatenated as a Seq[String]) of when the view was
* created.
*/
def viewCatalogAndNamespace: Seq[String] = {
if (properties.contains(VIEW_CATALOG_AND_NAMESPACE)) {
val numParts = properties(VIEW_CATALOG_AND_NAMESPACE).toInt
(0 until numParts).map { index =>
properties.getOrElse(
s"$VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX$index",
throw QueryCompilationErrors.corruptedTableNameContextInCatalogError(numParts, index)
)
}
} else if (properties.contains(VIEW_DEFAULT_DATABASE)) {
// Views created before Spark 3.0 can only access tables in the session catalog.
Seq(CatalogManager.SESSION_CATALOG_NAME, properties(VIEW_DEFAULT_DATABASE))
} else {
Nil
}
}
/**
* Return the SQL configs of when the view was created, the configs are applied when parsing and
* analyzing the view, should be empty if the CatalogTable is not a View or created by older
* versions of Spark(before 3.1.0).
*/
def viewSQLConfigs: Map[String, String] = {
try {
for ((key, value) <- properties if key.startsWith(CatalogTable.VIEW_SQL_CONFIG_PREFIX))
yield (key.substring(CatalogTable.VIEW_SQL_CONFIG_PREFIX.length), value)
} catch {
case e: Exception =>
throw QueryCompilationErrors.corruptedViewSQLConfigsInCatalogError(e)
}
}
/**
* Return the output column names of the query that creates a view, the column names are used to
* resolve a view, should be empty if the CatalogTable is not a View or created by older versions
* of Spark(before 2.2.0).
*/
def viewQueryColumnNames: Seq[String] = {
for {
numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq
index <- 0 until numCols.toInt
} yield properties.getOrElse(
s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
throw QueryCompilationErrors.corruptedViewQueryOutputColumnsInCatalogError(numCols, index)
)
}
/**
* Return temporary view names the current view was referred. should be empty if the
* CatalogTable is not a Temporary View or created by older versions of Spark(before 3.1.0).
*/
def viewReferredTempViewNames: Seq[Seq[String]] = {
try {
properties.get(VIEW_REFERRED_TEMP_VIEW_NAMES).map { json =>
parse(json).asInstanceOf[JArray].arr.map { namePartsJson =>
namePartsJson.asInstanceOf[JArray].arr.map(_.asInstanceOf[JString].s)
}
}.getOrElse(Seq.empty)
} catch {
case e: Exception =>
throw QueryCompilationErrors.corruptedViewReferredTempViewInCatalogError(e)
}
}
/**
* Return temporary function names the current view was referred. should be empty if the
* CatalogTable is not a Temporary View or created by older versions of Spark(before 3.1.0).
*/
def viewReferredTempFunctionNames: Seq[String] = {
try {
properties.get(VIEW_REFERRED_TEMP_FUNCTION_NAMES).map { json =>
parse(json).asInstanceOf[JArray].arr.map(_.asInstanceOf[JString].s)
}.getOrElse(Seq.empty)
} catch {
case e: Exception =>
throw QueryCompilationErrors.corruptedViewReferredTempFunctionsInCatalogError(e)
}
}
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
properties: Map[String, String] = storage.properties): CatalogTable = {
copy(storage = CatalogStorageFormat(
locationUri, inputFormat, outputFormat, serde, compressed, properties))
}
def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
val map = new mutable.LinkedHashMap[String, String]()
val tableProperties =
SQLConf.get.redactOptions(properties.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
.toSeq.sortBy(_._1)
.map(p => p._1 + "=" + p._2)
val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
val lastAccess = {
if (lastAccessTime <= 0) "UNKNOWN" else new Date(lastAccessTime).toString
}
identifier.catalog.foreach(map.put("Catalog", _))
identifier.database.foreach(map.put("Database", _))
map.put("Table", identifier.table)
if (owner != null && owner.nonEmpty) map.put("Owner", owner)
map.put("Created Time", new Date(createTime).toString)
map.put("Last Access", lastAccess)
map.put("Created By", "Spark " + createVersion)
map.put("Type", tableType.name)
provider.foreach(map.put("Provider", _))
bucketSpec.foreach(map ++= _.toLinkedHashMap)
comment.foreach(map.put("Comment", _))
if (tableType == CatalogTableType.VIEW) {
viewText.foreach(map.put("View Text", _))
viewOriginalText.foreach(map.put("View Original Text", _))
if (viewCatalogAndNamespace.nonEmpty) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
map.put("View Catalog and Namespace", viewCatalogAndNamespace.quoted)
}
if (viewQueryColumnNames.nonEmpty) {
map.put("View Query Output Columns", viewQueryColumnNames.mkString("[", ", ", "]"))
}
}
if (tableProperties.nonEmpty) {
map.put("Table Properties", tableProperties.mkString("[", ", ", "]"))
}
stats.foreach(s => map.put("Statistics", s.simpleString))
map ++= storage.toLinkedHashMap
if (tracksPartitionsInCatalog) map.put("Partition Provider", "Catalog")
if (partitionColumnNames.nonEmpty) map.put("Partition Columns", partitionColumns)
if (schema.nonEmpty) map.put("Schema", schema.treeString)
map
}
override def toString: String = {
toLinkedHashMap.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("CatalogTable(\n", "\n", ")")
}
/** Readable string representation for the CatalogTable. */
def simpleString: String = {
toLinkedHashMap.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "")
}
}
object CatalogTable {
val VIEW_PREFIX = "view."
// Starting from Spark 3.0, we don't use this property any more. `VIEW_CATALOG_AND_NAMESPACE` is
// used instead.
val VIEW_DEFAULT_DATABASE = VIEW_PREFIX + "default.database"
val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts"
val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + "catalogAndNamespace.part."
// Convert the current catalog and namespace to properties.
def catalogAndNamespaceToProps(
currentCatalog: String,
currentNamespace: Seq[String]): Map[String, String] = {
val props = new mutable.HashMap[String, String]
val parts = currentCatalog +: currentNamespace
if (parts.nonEmpty) {
props.put(VIEW_CATALOG_AND_NAMESPACE, parts.length.toString)
parts.zipWithIndex.foreach { case (name, index) =>
props.put(s"$VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX$index", name)
}
}
props.toMap
}
val VIEW_SQL_CONFIG_PREFIX = VIEW_PREFIX + "sqlConfig."
val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"
def splitLargeTableProp(
key: String,
value: String,
addProp: (String, String) => Unit,
defaultThreshold: Int): Unit = {
val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD)
.getOrElse(defaultThreshold)
if (value.length <= threshold) {
addProp(key, value)
} else {
val parts = value.grouped(threshold).toSeq
addProp(s"$key.numParts", parts.length.toString)
parts.zipWithIndex.foreach { case (part, index) =>
addProp(s"$key.part.$index", part)
}
}
}
def readLargeTableProp(props: Map[String, String], key: String): Option[String] = {
props.get(key).orElse {
if (props.filterKeys(_.startsWith(key)).isEmpty) {
None
} else {
val numParts = props.get(s"$key.numParts")
if (numParts.isEmpty) {
throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError(key)
} else {
val parts = (0 until numParts.get.toInt).map { index =>
props.getOrElse(s"$key.part.$index", {
throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError(
key, s"Missing part $index, $numParts parts are expected.")
})
}
Some(parts.mkString)
}
}
}
}
def isLargeTableProp(originalKey: String, propKey: String): Boolean = {
propKey == originalKey || propKey == s"$originalKey.numParts" ||
propKey.startsWith(s"$originalKey.part.")
}
def normalize(table: CatalogTable): CatalogTable = {
val nondeterministicProps = Set(
"CreateTime",
"transient_lastDdlTime",
"grantTime",
"lastUpdateTime",
"last_modified_by",
"last_modified_time",
"Owner:",
// The following are hive specific schema parameters which we do not need to match exactly.
"totalNumberFiles",
"maxFileSize",
"minFileSize"
)
table.copy(
createTime = 0L,
lastAccessTime = 0L,
properties = table.properties
.filterKeys(!nondeterministicProps.contains(_))
.map(identity)
.toMap,
stats = None,
ignoredProperties = Map.empty
)
}
}
/**
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
* We define this new class instead of directly using [[Statistics]] here because there are no
* concepts of attributes in catalog.
*/
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, CatalogColumnStat] = Map.empty) {
/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], planStatsEnabled: Boolean): Statistics = {
if (planStatsEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When plan statistics are disabled or the table doesn't have other statistics,
// we apply the size-only estimation strategy and only propagate sizeInBytes in statistics.
Statistics(sizeInBytes = sizeInBytes)
}
}
/** Readable string representation for the CatalogStatistics. */
def simpleString: String = {
val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
s"$sizeInBytes bytes$rowCountString"
}
}
/**
* This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
*/
case class CatalogColumnStat(
distinctCount: Option[BigInt] = None,
min: Option[String] = None,
max: Option[String] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None,
version: Int = CatalogColumnStat.VERSION) {
/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
* and the value is the string representation for the value.
* min/max values are stored as Strings. They can be deserialized using
* [[CatalogColumnStat.fromExternalString]].
*
* As part of the protocol, the returned map always contains a key called "version".
* Any of the fields that are null (None) won't appear in the map.
*/
def toMap(colName: String): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", CatalogColumnStat.VERSION.toString)
distinctCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
}
nullCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
}
avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
CatalogTable.splitLargeTableProp(
s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}",
HistogramSerializer.serialize(h),
map.put,
4000)
}
map.toMap
}
/** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
def toPlanStat(
colName: String,
dataType: DataType): ColumnStat =
ColumnStat(
distinctCount = distinctCount,
min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType, version)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram,
version = version)
}
object CatalogColumnStat extends Logging {
// List of string keys used to serialize CatalogColumnStat
val KEY_VERSION = "version"
private val KEY_DISTINCT_COUNT = "distinctCount"
private val KEY_MIN_VALUE = "min"
private val KEY_MAX_VALUE = "max"
private val KEY_NULL_COUNT = "nullCount"
private val KEY_AVG_LEN = "avgLen"
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"
val VERSION = 2
def getTimestampFormatter(
isParsing: Boolean,
format: String = "yyyy-MM-dd HH:mm:ss.SSSSSS",
zoneId: ZoneId = ZoneOffset.UTC): TimestampFormatter = {
TimestampFormatter(
format = format,
zoneId = zoneId,
isParsing = isParsing)
}
/**
* Converts from string representation of data type to the corresponding Catalyst data type.
*/
def fromExternalString(s: String, name: String, dataType: DataType, version: Int): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType if version == 1 => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case DateType => DateFormatter().parse(s)
case TimestampType if version == 1 =>
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case TimestampType => getTimestampFormatter(isParsing = true).parse(s)
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw QueryCompilationErrors.columnStatisticsDeserializationNotSupportedError(
name, dataType)
}
}
/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateFormatter().format(v.asInstanceOf[Int])
case TimestampType => getTimestampFormatter(isParsing = false).format(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw QueryCompilationErrors.columnStatisticsSerializationNotSupportedError(
colName, dataType)
}
externalValue.toString
}
/**
* Creates a [[CatalogColumnStat]] object from the given map.
* This is used to deserialize column stats from some external storage.
* The serialization side is defined in [[CatalogColumnStat.toMap]].
*/
def fromMap(
table: String,
colName: String,
map: Map[String, String]): Option[CatalogColumnStat] = {
try {
Some(CatalogColumnStat(
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM")
.map(HistogramSerializer.deserialize),
version = map(s"${colName}.${KEY_VERSION}").toInt
))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
None
}
}
}
case class CatalogTableType private(name: String)
object CatalogTableType {
val EXTERNAL = new CatalogTableType("EXTERNAL")
val MANAGED = new CatalogTableType("MANAGED")
val VIEW = new CatalogTableType("VIEW")
val tableTypes = Seq(EXTERNAL, MANAGED, VIEW)
}
/**
* A database defined in the catalog.
*/
case class CatalogDatabase(
name: String,
description: String,
locationUri: URI,
properties: Map[String, String])
object CatalogTypes {
/**
* Specifications of a table partition. Mapping column name to column value.
*/
type TablePartitionSpec = Map[String, String]
/**
* Initialize an empty spec.
*/
lazy val emptyTablePartitionSpec: TablePartitionSpec = Map.empty[String, String]
}
/**
* A placeholder for a table relation, which will be replaced by concrete relation like
* `LogicalRelation` or `HiveTableRelation`, during analysis.
*/
case class UnresolvedCatalogRelation(
tableMeta: CatalogTable,
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
override val isStreaming: Boolean = false) extends LeafNode {
assert(tableMeta.identifier.database.isDefined)
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
/**
* A wrapper to store the temporary view info, will be kept in `SessionCatalog`
* and will be transformed to `View` during analysis. If the temporary view is
* storing an analyzed plan, `plan` is set to the analyzed plan for the view.
*/
case class TemporaryViewRelation(
tableMeta: CatalogTable,
plan: Option[LogicalPlan] = None) extends LeafNode {
require(plan.isEmpty ||
(plan.get.resolved && tableMeta.properties.contains(VIEW_STORING_ANALYZED_PLAN)))
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
/**
* A `LogicalPlan` that represents a hive table.
*
* TODO: remove this after we completely make hive as a data source.
*/
case class HiveTableRelation(
tableMeta: CatalogTable,
dataCols: Seq[AttributeReference],
partitionCols: Seq[AttributeReference],
tableStats: Option[Statistics] = None,
@transient prunedPartitions: Option[Seq[CatalogTablePartition]] = None)
extends LeafNode with MultiInstanceRelation {
assert(tableMeta.identifier.database.isDefined)
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
// The partition column should always appear after data columns.
override def output: Seq[AttributeReference] = dataCols ++ partitionCols
def isPartitioned: Boolean = partitionCols.nonEmpty
override def doCanonicalize(): HiveTableRelation = copy(
tableMeta = CatalogTable.normalize(tableMeta),
dataCols = dataCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index))
},
partitionCols = partitionCols.zipWithIndex.map {
case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
},
tableStats = None
)
override def computeStats(): Statistics = {
tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
.orElse(tableStats)
.getOrElse {
throw new IllegalStateException("Table stats must be specified.")
}
}
override def newInstance(): HiveTableRelation = copy(
dataCols = dataCols.map(_.newInstance()),
partitionCols = partitionCols.map(_.newInstance()))
override def simpleString(maxFields: Int): String = {
val catalogTable = tableMeta.storage.serde match {
case Some(serde) => tableMeta.identifier :: serde :: Nil
case _ => tableMeta.identifier :: Nil
}
var metadata = Map(
"CatalogTable" -> catalogTable.mkString(", "),
"Data Cols" -> truncatedString(dataCols, "[", ", ", "]", maxFields),
"Partition Cols" -> truncatedString(partitionCols, "[", ", ", "]", maxFields)
)
if (prunedPartitions.nonEmpty) {
metadata += ("Pruned Partitions" -> {
val parts = prunedPartitions.get.map { part =>
val spec = part.spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
if (part.storage.serde.nonEmpty && part.storage.serde != tableMeta.storage.serde) {
s"($spec, ${part.storage.serde.get})"
} else {
s"($spec)"
}
}
truncatedString(parts, "[", ", ", "]", maxFields)
})
}
val metadataEntries = metadata.toSeq.map {
case (key, value) if key == "CatalogTable" => value
case (key, value) =>
key + ": " + StringUtils.abbreviate(value, SQLConf.get.maxMetadataStringLength)
}
val metadataStr = truncatedString(metadataEntries, "[", ", ", "]", maxFields)
s"$nodeName $metadataStr"
}
}
相关信息
相关文章
spark ExternalCatalogWithListener 源码
spark FunctionExpressionBuilder 源码
spark GlobalTempViewManager 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦