spark ContinuousDataSourceRDD 源码
spark ContinuousDataSourceRDD 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming.continuous
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.NextIterator
class ContinuousDataSourceRDDPartition(
val index: Int,
val inputPartition: InputPartition)
extends Partition with Serializable {
// This is semantically a lazy val - it's initialized once the first time a call to
// ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across
// all compute() calls for a partition. This ensures that one compute() picks up where the
// previous one ended.
// We don't make it actually a lazy val because it needs input which isn't available here.
// This will only be initialized on the executors.
private[continuous] var queueReader: ContinuousQueuedDataReader = _
}
/**
* The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]]
* to read from the remote source, and polls that queue for incoming rows.
*
* Note that continuous processing calls compute() multiple times, and the same
* [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split.
*/
class ContinuousDataSourceRDD(
sc: SparkContext,
dataQueueSize: Int,
epochPollIntervalMs: Long,
private val inputPartitions: Seq[InputPartition],
schema: StructType,
partitionReaderFactory: ContinuousPartitionReaderFactory,
customMetrics: Map[String, SQLMetric])
extends RDD[InternalRow](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
inputPartitions.zipWithIndex.map {
case (inputPartition, index) => new ContinuousDataSourceRDDPartition(index, inputPartition)
}.toArray
}
private def castPartition(split: Partition): ContinuousDataSourceRDDPartition = split match {
case p: ContinuousDataSourceRDDPartition => p
case _ => throw new SparkException(s"[BUG] Not a ContinuousDataSourceRDDPartition: $split")
}
/**
* Initialize the shared reader for this partition if needed, then read rows from it until
* it returns null to signal the end of the epoch.
*/
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
// If attempt number isn't 0, this is a task retry, which we don't support.
if (context.attemptNumber() != 0) {
throw new ContinuousTaskRetryException()
}
val readerForPartition = {
val partition = castPartition(split)
if (partition.queueReader == null) {
val partitionReader = partitionReaderFactory.createReader(
partition.inputPartition)
partition.queueReader = new ContinuousQueuedDataReader(
partition.index, partitionReader, schema, context, dataQueueSize, epochPollIntervalMs)
}
partition.queueReader
}
val partitionReader = readerForPartition.getPartitionReader()
new NextIterator[InternalRow] {
private var numRow = 0L
override def getNext(): InternalRow = {
if (numRow % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {
CustomMetrics.updateMetrics(partitionReader.currentMetricsValues, customMetrics)
}
numRow += 1
readerForPartition.next() match {
case null =>
finished = true
null
case row => row
}
}
override def close(): Unit = {}
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
castPartition(split).inputPartition.preferredLocations()
}
}
相关信息
相关文章
spark ContinuousQueuedDataReader 源码
spark ContinuousRateStreamSource 源码
spark ContinuousTaskRetryException 源码
spark ContinuousTextSocketSource 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦