spark partitioning 源码
spark partitioning 代码
文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.physical
import scala.annotation.tailrec
import scala.collection.mutable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, IntegerType}
/**
* Specifies how tuples that share common expressions will be distributed when a query is executed
* in parallel on many machines.
*
* Distribution here refers to inter-node partitioning of data. That is, it describes how tuples
* are partitioned across physical machines in a cluster. Knowing this property allows some
* operators (e.g., Aggregate) to perform partition local operations instead of global ones.
*/
sealed trait Distribution {
/**
* The required number of partitions for this distribution. If it's None, then any number of
* partitions is allowed for this distribution.
*/
def requiredNumPartitions: Option[Int]
/**
* Creates a default partitioning for this distribution, which can satisfy this distribution while
* matching the given number of partitions.
*/
def createPartitioning(numPartitions: Int): Partitioning
}
/**
* Represents a distribution where no promises are made about co-location of data.
*/
case object UnspecifiedDistribution extends Distribution {
override def requiredNumPartitions: Option[Int] = None
override def createPartitioning(numPartitions: Int): Partitioning = {
throw new IllegalStateException("UnspecifiedDistribution does not have default partitioning.")
}
}
/**
* Represents a distribution that only has a single partition and all tuples of the dataset
* are co-located.
*/
case object AllTuples extends Distribution {
override def requiredNumPartitions: Option[Int] = Some(1)
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(numPartitions == 1, "The default partitioning of AllTuples can only have 1 partition.")
SinglePartition
}
}
/**
* Represents data where tuples that share the same values for the `clustering`
* [[Expression Expressions]] will be co-located in the same partition.
*
* @param requireAllClusterKeys When true, `Partitioning` which satisfies this distribution,
* must match all `clustering` expressions in the same ordering.
*/
case class ClusteredDistribution(
clustering: Seq[Expression],
requireAllClusterKeys: Boolean = SQLConf.get.getConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION),
requiredNumPartitions: Option[Int] = None) extends Distribution {
require(
clustering != Nil,
"The clustering expressions of a ClusteredDistribution should not be Nil. " +
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
s"the actual number of partitions is $numPartitions.")
HashPartitioning(clustering, numPartitions)
}
/**
* Checks if `expressions` match all `clustering` expressions in the same ordering.
*
* `Partitioning` should call this to check its expressions when `requireAllClusterKeys`
* is set to true.
*/
def areAllClusterKeysMatched(expressions: Seq[Expression]): Boolean = {
expressions.length == clustering.length &&
expressions.zip(clustering).forall {
case (l, r) => l.semanticEquals(r)
}
}
}
/**
* Represents the requirement of distribution on the stateful operator in Structured Streaming.
*
* Each partition in stateful operator initializes state store(s), which are independent with state
* store(s) in other partitions. Since it is not possible to repartition the data in state store,
* Spark should make sure the physical partitioning of the stateful operator is unchanged across
* Spark versions. Violation of this requirement may bring silent correctness issue.
*
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
* When `_requiredNumPartitions` is 1, [[SinglePartition]] is essentially same as
* [[HashPartitioning]], so it can satisfy this distribution as well.
*
* NOTE: This is applied only to stream-stream join as of now. For other stateful operators, we
* have been using ClusteredDistribution, which could construct the physical partitioning of the
* state in different way (ClusteredDistribution requires relaxed condition and multiple
* partitionings can satisfy the requirement.) We need to construct the way to fix this with
* minimizing possibility to break the existing checkpoints.
*
* TODO(SPARK-38204): address the issue explained in above note.
*/
case class StatefulOpClusteredDistribution(
expressions: Seq[Expression],
_requiredNumPartitions: Int) extends Distribution {
require(
expressions != Nil,
"The expressions for hash of a StatefulOpClusteredDistribution should not be Nil. " +
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")
override val requiredNumPartitions: Option[Int] = Some(_requiredNumPartitions)
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(_requiredNumPartitions == numPartitions,
s"This StatefulOpClusteredDistribution requires ${_requiredNumPartitions} " +
s"partitions, but the actual number of partitions is $numPartitions.")
HashPartitioning(expressions, numPartitions)
}
}
/**
* Represents data where tuples have been ordered according to the `ordering`
* [[Expression Expressions]]. Its requirement is defined as the following:
* - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or
* equal to any row in the first partition, according to the `ordering` expressions.
*
* In other words, this distribution requires the rows to be ordered across partitions, but not
* necessarily within a partition.
*/
case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
require(
ordering != Nil,
"The ordering expressions of an OrderedDistribution should not be Nil. " +
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")
override def requiredNumPartitions: Option[Int] = None
override def createPartitioning(numPartitions: Int): Partitioning = {
RangePartitioning(ordering, numPartitions)
}
}
/**
* Represents data where tuples are broadcasted to every node. It is quite common that the
* entire set of tuples is transformed into different data structure.
*/
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution {
override def requiredNumPartitions: Option[Int] = Some(1)
override def createPartitioning(numPartitions: Int): Partitioning = {
assert(numPartitions == 1,
"The default partitioning of BroadcastDistribution can only have 1 partition.")
BroadcastPartitioning(mode)
}
}
/**
* Describes how an operator's output is split across partitions. It has 2 major properties:
* 1. number of partitions.
* 2. if it can satisfy a given distribution.
*/
trait Partitioning {
/** Returns the number of partitions that the data is split across */
val numPartitions: Int
/**
* Returns true iff the guarantees made by this [[Partitioning]] are sufficient
* to satisfy the partitioning scheme mandated by the `required` [[Distribution]],
* i.e. the current dataset does not need to be re-partitioned for the `required`
* Distribution (it is possible that tuples within a partition need to be reorganized).
*
* A [[Partitioning]] can never satisfy a [[Distribution]] if its `numPartitions` doesn't match
* [[Distribution.requiredNumPartitions]].
*/
final def satisfies(required: Distribution): Boolean = {
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
}
/**
* Creates a shuffle spec for this partitioning and its required distribution. The
* spec is used in the scenario where an operator has multiple children (e.g., join), and is
* used to decide whether this child is co-partitioned with others, therefore whether extra
* shuffle shall be introduced.
*
* @param distribution the required clustered distribution for this partitioning
*/
def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
throw new IllegalStateException(s"Unexpected partitioning: ${getClass.getSimpleName}")
/**
* The actual method that defines whether this [[Partitioning]] can satisfy the given
* [[Distribution]], after the `numPartitions` check.
*
* By default a [[Partitioning]] can satisfy [[UnspecifiedDistribution]], and [[AllTuples]] if
* the [[Partitioning]] only have one partition. Implementations can also overwrite this method
* with special logic.
*/
protected def satisfies0(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case AllTuples => numPartitions == 1
case _ => false
}
}
case class UnknownPartitioning(numPartitions: Int) extends Partitioning
/**
* Represents a partitioning where rows are distributed evenly across output partitions
* by starting from a random target partition number and distributing rows in a round-robin
* fashion. This partitioning is used when implementing the DataFrame.repartition() operator.
*/
case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
case object SinglePartition extends Partitioning {
val numPartitions = 1
override def satisfies0(required: Distribution): Boolean = required match {
case _: BroadcastDistribution => false
case _ => true
}
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
SinglePartitionShuffleSpec
}
/**
* Represents a partitioning where rows are split up across partitions based on the hash
* of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be
* in the same partition.
*
* Since [[StatefulOpClusteredDistribution]] relies on this partitioning and Spark requires
* stateful operators to retain the same physical partitioning during the lifetime of the query
* (including restart), the result of evaluation on `partitionIdExpression` must be unchanged
* across Spark versions. Violation of this requirement may bring silent correctness issue.
*/
case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[Expression] = expressions
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case h: StatefulOpClusteredDistribution =>
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
if (requireAllClusterKeys) {
// Checks `HashPartitioning` is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
}
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
HashShuffleSpec(this, distribution)
/**
* Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
* than numPartitions) based on hashing expressions.
*/
def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
}
/**
* Represents a partitioning where rows are split across partitions based on transforms defined
* by `expressions`. `partitionValuesOpt`, if defined, should contain value of partition key(s) in
* ascending order, after evaluated by the transforms in `expressions`, for each input partition.
* In addition, its length must be the same as the number of input partitions (and thus is a 1-1
* mapping), and each row in `partitionValuesOpt` must be unique.
*
* For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValuesOpt` is
* `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows
* in each partition have the same value for column `ts_col` (which is of timestamp type), after
* being applied by the `years` transform.
*
* On the other hand, `[0, 0, 1]` is not a valid value for `partitionValuesOpt` since `0` is
* duplicated twice.
*
* @param expressions partition expressions for the partitioning.
* @param numPartitions the number of partitions
* @param partitionValuesOpt if set, the values for the cluster keys of the distribution, must be
* in ascending order.
*/
case class KeyGroupedPartitioning(
expressions: Seq[Expression],
numPartitions: Int,
partitionValuesOpt: Option[Seq[InternalRow]] = None) extends Partitioning {
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
if (requireAllClusterKeys) {
// Checks whether this partitioning is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
c.areAllClusterKeysMatched(expressions)
} else {
// We'll need to find leaf attributes from the partition expressions first.
val attributes = expressions.flatMap(_.collectLeaves())
attributes.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ =>
false
}
}
}
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
KeyGroupedShuffleSpec(this, distribution)
}
object KeyGroupedPartitioning {
def apply(
expressions: Seq[Expression],
partitionValues: Seq[InternalRow]): KeyGroupedPartitioning = {
KeyGroupedPartitioning(expressions, partitionValues.size, Some(partitionValues))
}
def supportsExpressions(expressions: Seq[Expression]): Boolean = {
def isSupportedTransform(transform: TransformExpression): Boolean = {
transform.children.size == 1 && isReference(transform.children.head)
}
@tailrec
def isReference(e: Expression): Boolean = e match {
case _: Attribute => true
case g: GetStructField => isReference(g.child)
case _ => false
}
expressions.forall {
case t: TransformExpression if isSupportedTransform(t) => true
case e: Expression if isReference(e) => true
case _ => false
}
}
}
/**
* Represents a partitioning where rows are split across partitions based on some total ordering of
* the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees:
* Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row
* in the first partition, according to the `ordering` expressions.
*
* This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as
* there is no overlap between partitions.
*
* This class extends expression primarily so that transformations over expression will descend
* into its child.
*/
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[SortOrder] = ordering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case OrderedDistribution(requiredOrdering) =>
// If `ordering` is a prefix of `requiredOrdering`:
// Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the
// RangePartitioning definition, any [a, b] in a previous partition must be smaller
// than any [a, b] in the following partition. This also means any [a, b, c] in a
// previous partition must be smaller than any [a, b, c] in the following partition.
// Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`.
//
// If `requiredOrdering` is a prefix of `ordering`:
// Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the
// RangePartitioning definition, any [a, b, c] in a previous partition must be smaller
// than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous
// partition which is larger than a [a2, b2] from the following partition, then there
// must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning
// definition. So it's guaranteed that, any [a, b] in a previous partition must not be
// greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
val expressions = ordering.map(_.child)
if (requireAllClusterKeys) {
// Checks `RangePartitioning` is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
}
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
RangeShuffleSpec(this.numPartitions, distribution)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): RangePartitioning =
copy(ordering = newChildren.asInstanceOf[Seq[SortOrder]])
}
/**
* A collection of [[Partitioning]]s that can be used to describe the partitioning
* scheme of the output of a physical operator. It is usually used for an operator
* that has multiple children. In this case, a [[Partitioning]] in this collection
* describes how this operator's output is partitioned based on expressions from
* a child. For example, for a Join operator on two tables `A` and `B`
* with a join condition `A.key1 = B.key2`, assuming we use HashPartitioning schema,
* there are two [[Partitioning]]s can be used to describe how the output of
* this Join operator is partitioned, which are `HashPartitioning(A.key1)` and
* `HashPartitioning(B.key2)`. It is also worth noting that `partitionings`
* in this collection do not need to be equivalent, which is useful for
* Outer Join operators.
*/
case class PartitioningCollection(partitionings: Seq[Partitioning])
extends Expression with Partitioning with Unevaluable {
require(
partitionings.map(_.numPartitions).distinct.length == 1,
s"PartitioningCollection requires all of its partitionings have the same numPartitions.")
override def children: Seq[Expression] = partitionings.collect {
case expr: Expression => expr
}
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override val numPartitions = partitionings.map(_.numPartitions).distinct.head
/**
* Returns true if any `partitioning` of this collection satisfies the given
* [[Distribution]].
*/
override def satisfies0(required: Distribution): Boolean =
partitionings.exists(_.satisfies(required))
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = {
val filtered = partitionings.filter(_.satisfies(distribution))
ShuffleSpecCollection(filtered.map(_.createShuffleSpec(distribution)))
}
override def toString: String = {
partitionings.map(_.toString).mkString("(", " or ", ")")
}
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): PartitioningCollection =
super.legacyWithNewChildren(newChildren).asInstanceOf[PartitioningCollection]
}
/**
* Represents a partitioning where rows are collected, transformed and broadcasted to each
* node in the cluster.
*/
case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
override val numPartitions: Int = 1
override def satisfies0(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case BroadcastDistribution(m) if m == mode => true
case _ => false
}
}
/**
* This is used in the scenario where an operator has multiple children (e.g., join) and one or more
* of which have their own requirement regarding whether its data can be considered as
* co-partitioned from others. This offers APIs for:
*
* - Comparing with specs from other children of the operator and check if they are compatible.
* When two specs are compatible, we can say their data are co-partitioned, and Spark will
* potentially be able to eliminate shuffle if necessary.
* - Creating a partitioning that can be used to re-partition another child, so that to make it
* having a compatible partitioning as this node.
*/
trait ShuffleSpec {
/**
* Returns the number of partitions of this shuffle spec
*/
def numPartitions: Int
/**
* Returns true iff this spec is compatible with the provided shuffle spec.
*
* A true return value means that the data partitioning from this spec can be seen as
* co-partitioned with the `other`, and therefore no shuffle is required when joining the two
* sides.
*
* Note that Spark assumes this to be reflexive, symmetric and transitive.
*/
def isCompatibleWith(other: ShuffleSpec): Boolean
/**
* Whether this shuffle spec can be used to create partitionings for the other children.
*/
def canCreatePartitioning: Boolean
/**
* Creates a partitioning that can be used to re-partition the other side with the given
* clustering expressions.
*
* This will only be called when:
* - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
*/
def createPartitioning(clustering: Seq[Expression]): Partitioning =
throw new UnsupportedOperationException("Operation unsupported for " +
s"${getClass.getCanonicalName}")
}
case object SinglePartitionShuffleSpec extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
other.numPartitions == 1
}
override def canCreatePartitioning: Boolean = false
override def createPartitioning(clustering: Seq[Expression]): Partitioning =
SinglePartition
override def numPartitions: Int = 1
}
case class RangeShuffleSpec(
numPartitions: Int,
distribution: ClusteredDistribution) extends ShuffleSpec {
// `RangePartitioning` is not compatible with any other partitioning since it can't guarantee
// data are co-partitioned for all the children, as range boundaries are randomly sampled. We
// can't let `RangeShuffleSpec` to create a partitioning.
override def canCreatePartitioning: Boolean = false
override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec => numPartitions == 1
case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith)
// `RangePartitioning` is not compatible with any other partitioning since it can't guarantee
// data are co-partitioned for all the children, as range boundaries are randomly sampled.
case _ => false
}
}
case class HashShuffleSpec(
partitioning: HashPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
/**
* A sequence where each element is a set of positions of the hash partition key to the cluster
* keys. For instance, if cluster keys are [a, b, b] and hash partition keys are [a, b], the
* result will be [(0), (1, 2)].
*
* This is useful to check compatibility between two `HashShuffleSpec`s. If the cluster keys are
* [a, b, b] and [x, y, z] for the two join children, and the hash partition keys are
* [a, b] and [x, z], they are compatible. With the positions, we can do the compatibility check
* by looking at if the positions of hash partition keys from two sides have overlapping.
*/
lazy val hashKeyPositions: Seq[mutable.BitSet] = {
val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) =>
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos)
}
partitioning.expressions.map(k => distKeyToPos.getOrElse(k.canonicalized, mutable.BitSet.empty))
}
override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec =>
partitioning.numPartitions == 1
case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution) =>
// we need to check:
// 1. both distributions have the same number of clustering expressions
// 2. both partitioning have the same number of partitions
// 3. both partitioning have the same number of expressions
// 4. each pair of partitioning expression from both sides has overlapping positions in their
// corresponding distributions.
distribution.clustering.length == otherDistribution.clustering.length &&
partitioning.numPartitions == otherPartitioning.numPartitions &&
partitioning.expressions.length == otherPartitioning.expressions.length && {
val otherHashKeyPositions = otherHashSpec.hashKeyPositions
hashKeyPositions.zip(otherHashKeyPositions).forall { case (left, right) =>
left.intersect(right).nonEmpty
}
}
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
false
}
override def canCreatePartitioning: Boolean = {
// To avoid potential data skew, we don't allow `HashShuffleSpec` to create partitioning if
// the hash partition keys are not the full join keys (the cluster keys). Then the planner
// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all
// the join keys.
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) {
distribution.areAllClusterKeysMatched(partitioning.expressions)
} else {
true
}
}
override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
val exprs = hashKeyPositions.map(v => clustering(v.head))
HashPartitioning(exprs, partitioning.numPartitions)
}
override def numPartitions: Int = partitioning.numPartitions
}
case class KeyGroupedShuffleSpec(
partitioning: KeyGroupedPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
/**
* A sequence where each element is a set of positions of the partition expression to the cluster
* keys. For instance, if cluster keys are [a, b, b] and partition expressions are
* [bucket(4, a), years(b)], the result will be [(0), (1, 2)].
*
* Note that we only allow each partition expression to contain a single partition key.
* Therefore the mapping here is very similar to that from `HashShuffleSpec`.
*/
lazy val keyPositions: Seq[mutable.BitSet] = {
val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) =>
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos)
}
partitioning.expressions.map { e =>
val leaves = e.collectLeaves()
assert(leaves.size == 1, s"Expected exactly one child from $e, but found ${leaves.size}")
distKeyToPos.getOrElse(leaves.head.canonicalized, mutable.BitSet.empty)
}
}
private lazy val ordering: Ordering[InternalRow] =
RowOrdering.createNaturalAscendingOrdering(partitioning.expressions.map(_.dataType))
override def numPartitions: Int = partitioning.numPartitions
override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
// Here we check:
// 1. both distributions have the same number of clustering keys
// 2. both partitioning have the same number of partitions
// 3. partition expressions from both sides are compatible, which means:
// 3.1 both sides have the same number of partition expressions
// 3.2 for each pair of partition expressions at the same index, the corresponding
// partition keys must share overlapping positions in their respective clustering keys.
// 3.3 each pair of partition expressions at the same index must share compatible
// transform functions.
// 4. the partition values, if present on both sides, are following the same order.
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) =>
val expressions = partitioning.expressions
val otherExpressions = otherPartitioning.expressions
distribution.clustering.length == otherDistribution.clustering.length &&
numPartitions == other.numPartitions &&
expressions.length == otherExpressions.length && {
val otherKeyPositions = otherSpec.keyPositions
keyPositions.zip(otherKeyPositions).forall { case (left, right) =>
left.intersect(right).nonEmpty
}
} && expressions.zip(otherExpressions).forall {
case (l, r) => isExpressionCompatible(l, r)
} && partitioning.partitionValuesOpt.zip(otherPartitioning.partitionValuesOpt).forall {
case (left, right) => left.zip(right).forall { case (l, r) =>
ordering.compare(l, r) == 0
}
}
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ => false
}
private def isExpressionCompatible(left: Expression, right: Expression): Boolean =
(left, right) match {
case (_: LeafExpression, _: LeafExpression) => true
case (left: TransformExpression, right: TransformExpression) =>
left.isSameFunction(right)
case _ => false
}
override def canCreatePartitioning: Boolean = false
}
case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
specs.exists(_.isCompatibleWith(other))
}
override def canCreatePartitioning: Boolean =
specs.forall(_.canCreatePartitioning)
override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
// as we only consider # of partitions as the cost now, it doesn't matter which one we choose
// since they should all have the same # of partitions.
require(specs.map(_.numPartitions).toSet.size == 1, "expected all specs in the collection " +
"to have the same number of partitions")
specs.head.createPartitioning(clustering)
}
override def numPartitions: Int = {
require(specs.nonEmpty, "expected specs to be non-empty")
specs.head.numPartitions
}
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦