spark RowLevelOperationRuntimeGroupFiltering 源码

  • 2022-10-20
  • 浏览 (115)

spark RowLevelOperationRuntimeGroupFiltering 代码


 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}

 * A rule that assigns a subquery to filter groups in row-level operations at runtime.
 * Data skipping during job planning for row-level operations is limited to expressions that can be
 * converted to data source filters. Since not all expressions can be pushed down that way and
 * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
 * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
 * will inject a subquery to find all rows that match the condition so that data sources know
 * exactly which groups must be rewritten.
 * Note this rule only applies to group-based row-level operations.
case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
  extends Rule[LogicalPlan] with PredicateHelper {

  import DataSourceV2Implicits._

  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
    // apply special dynamic filtering only for group-based row-level operations
    case GroupBasedRowLevelOperation(replaceData, cond,
        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>

      // use reference equality on scan to find required scan relations
      val newQuery = replaceData.query transformUp {
        case r: DataSourceV2ScanRelation if r.scan eq scan =>
          // use the original table instance that was loaded for this row-level operation
          // in order to leverage a regular batch scan in the group filter query
          val originalTable = r.relation.table.asRowLevelOperationTable.table
          val relation = r.relation.copy(table = originalTable)
          val matchingRowsPlan = buildMatchingRowsPlan(relation, cond)

          val filterAttrs = scan.filterAttributes
          val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
          val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
          val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)

          Filter(dynamicPruningCond, r)

      // optimize subqueries to rewrite them as joins and trigger job planning
      replaceData.copy(query = optimizeSubqueries(newQuery))

  private def buildMatchingRowsPlan(
      relation: DataSourceV2Relation,
      cond: Expression): LogicalPlan = {

    val matchingRowsPlan = Filter(cond, relation)

    // clone the relation and assign new expr IDs to avoid conflicts
    matchingRowsPlan transformUpWithNewOutput {
      case r: DataSourceV2Relation if r eq relation =>
        val oldOutput = r.output
        val newOutput =
        r.copy(output = newOutput) ->

  private def buildDynamicPruningCond(
      matchingRowsPlan: LogicalPlan,
      buildKeys: Seq[Attribute],
      pruningKeys: Seq[Attribute]): Expression = {

    val buildQuery = Project(buildKeys, matchingRowsPlan)
    val dynamicPruningSubqueries = { case (key, index) =>
      DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false)


spark 源码目录


spark CleanupDynamicPruningFilters 源码

spark PartitionPruning 源码

spark PlanDynamicPruningFilters 源码

0  赞