spark TimeWindow 源码

  • 2022-10-20
  • 浏览 (224)

spark TimeWindow 代码

文件路径:/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreePattern.{TIME_WINDOW, TreePattern}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
  usage = """
    _FUNC_(time_column, window_duration[, slide_duration[, start_time]]) - Bucketize rows into one or more time windows given a timestamp specifying column.
      Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).
      Windows can support microsecond precision. Windows in the order of months are not supported.
      See <a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time">'Window Operations on Event Time'</a> in Structured Streaming guide doc for detailed explanation and examples.
  """,
  arguments = """
    Arguments:
      * time_column - The column or the expression to use as the timestamp for windowing by time. The time column must be of TimestampType.
      * window_duration - A string specifying the width of the window represented as "interval value".
        (See <a href="https://spark.apache.org/docs/latest/sql-ref-literals.html#interval-literal">Interval Literal</a> for more details.)
        Note that the duration is a fixed length of time, and does not vary over time according to a calendar.
      * slide_duration - A string specifying the sliding interval of the window represented as "interval value".
        A new window will be generated every `slide_duration`. Must be less than or equal to the `window_duration`.
        This duration is likewise absolute, and does not vary according to a calendar.
      * start_time - The offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals.
        For example, in order to have hourly tumbling windows that start 15 minutes past the hour,
        e.g. 12:15-13:15, 13:15-14:15... provide `start_time` as `15 minutes`.
  """,
  examples = """
    Examples:
      > SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, _FUNC_(b, '5 minutes') ORDER BY a, start;
        A1	2021-01-01 00:00:00	2021-01-01 00:05:00	2
        A1	2021-01-01 00:05:00	2021-01-01 00:10:00	1
        A2	2021-01-01 00:00:00	2021-01-01 00:05:00	1
      > SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, _FUNC_(b, '10 minutes', '5 minutes') ORDER BY a, start;
        A1	2020-12-31 23:55:00	2021-01-01 00:05:00	2
        A1	2021-01-01 00:00:00	2021-01-01 00:10:00	3
        A1	2021-01-01 00:05:00	2021-01-01 00:15:00	1
        A2	2020-12-31 23:55:00	2021-01-01 00:05:00	1
        A2	2021-01-01 00:00:00	2021-01-01 00:10:00	1
  """,
  group = "datetime_funcs",
  since = "2.0.0")
// scalastyle:on line.size.limit line.contains.tab
case class TimeWindow(
    timeColumn: Expression,
    windowDuration: Long,
    slideDuration: Long,
    startTime: Long) extends UnaryExpression
  with ImplicitCastInputTypes
  with Unevaluable
  with NonSQLExpression {

  //////////////////////////
  // SQL Constructors
  //////////////////////////

  def this(
      timeColumn: Expression,
      windowDuration: Expression,
      slideDuration: Expression,
      startTime: Expression) = {
    this(timeColumn, TimeWindow.parseExpression(windowDuration),
      TimeWindow.parseExpression(slideDuration), TimeWindow.parseExpression(startTime))
  }

  def this(timeColumn: Expression, windowDuration: Expression, slideDuration: Expression) = {
    this(timeColumn, TimeWindow.parseExpression(windowDuration),
      TimeWindow.parseExpression(slideDuration), 0)
  }

  def this(timeColumn: Expression, windowDuration: Expression) = {
    this(timeColumn, windowDuration, windowDuration)
  }

  override def child: Expression = timeColumn
  override def inputTypes: Seq[AbstractDataType] = Seq(AnyTimestampType)
  override def dataType: DataType = new StructType()
    .add(StructField("start", child.dataType))
    .add(StructField("end", child.dataType))
  override def prettyName: String = "window"
  final override val nodePatterns: Seq[TreePattern] = Seq(TIME_WINDOW)

  // This expression is replaced in the analyzer.
  override lazy val resolved = false

  /**
   * Validate the inputs for the window duration, slide duration, and start time in addition to
   * the input data type.
   */
  override def checkInputDataTypes(): TypeCheckResult = {
    val dataTypeCheck = super.checkInputDataTypes()
    if (dataTypeCheck.isSuccess) {
      if (windowDuration <= 0) {
        return TypeCheckFailure(s"The window duration ($windowDuration) must be greater than 0.")
      }
      if (slideDuration <= 0) {
        return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
      }
      if (slideDuration > windowDuration) {
        return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
          s" to the windowDuration ($windowDuration).")
      }
      if (startTime.abs >= slideDuration) {
        return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
          s"than the slideDuration ($slideDuration).")
      }
    }
    dataTypeCheck
  }

  override protected def withNewChildInternal(newChild: Expression): TimeWindow =
    copy(timeColumn = newChild)
}

object TimeWindow {
  /**
   * Parses the interval string for a valid time duration. CalendarInterval expects interval
   * strings to start with the string `interval`. For usability, we prepend `interval` to the string
   * if the user omitted it.
   *
   * @param interval The interval string
   * @return The interval duration in microseconds. SparkSQL casts TimestampType has microsecond
   *         precision.
   */
  def getIntervalInMicroSeconds(interval: String): Long = {
    val cal = IntervalUtils.fromIntervalString(interval)
    if (cal.months != 0) {
      throw new IllegalArgumentException(
        s"Intervals greater than a month is not supported ($interval).")
    }
    Math.addExact(Math.multiplyExact(cal.days, MICROS_PER_DAY), cal.microseconds)
  }

  /**
   * Parses the duration expression to generate the long value for the original constructor so
   * that we can use `window` in SQL.
   */
  def parseExpression(expr: Expression): Long = expr match {
    case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString)
    case IntegerLiteral(i) => i.toLong
    case NonNullLiteral(l, LongType) => l.toString.toLong
    case _ => throw QueryCompilationErrors.invalidLiteralForWindowDurationError()
  }

  def apply(
      timeColumn: Expression,
      windowDuration: String,
      slideDuration: String,
      startTime: String): TimeWindow = {
    TimeWindow(timeColumn,
      getIntervalInMicroSeconds(windowDuration),
      getIntervalInMicroSeconds(slideDuration),
      getIntervalInMicroSeconds(startTime))
  }
}

/**
 * Expression used internally to convert the TimestampType to Long and back without losing
 * precision, i.e. in microseconds. Used in time windowing.
 */
case class PreciseTimestampConversion(
    child: Expression,
    fromType: DataType,
    toType: DataType) extends UnaryExpression with ExpectsInputTypes with NullIntolerant {
  override def inputTypes: Seq[AbstractDataType] = Seq(fromType)
  override def dataType: DataType = toType
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val eval = child.genCode(ctx)
    ev.copy(code = eval.code +
      code"""boolean ${ev.isNull} = ${eval.isNull};
         |${CodeGenerator.javaType(dataType)} ${ev.value} = ${eval.value};
       """.stripMargin)
  }
  override def nullSafeEval(input: Any): Any = input

  override protected def withNewChildInternal(newChild: Expression): PreciseTimestampConversion =
    copy(child = newChild)
}

相关信息

spark 源码目录

相关文章

spark AliasHelper 源码

spark ApplyFunctionExpression 源码

spark AttributeSet 源码

spark BloomFilterMightContain 源码

spark BoundAttribute 源码

spark CallMethodViaReflection 源码

spark Cast 源码

spark CodeGeneratorWithInterpretedFallback 源码

spark DynamicPruning 源码

spark EquivalentExpressions 源码

0  赞