spark RecordReaderIterator 源码
spark RecordReaderIterator 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.io.Closeable
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
*
* Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
* column batches by pretending they are rows.
*/
class RecordReaderIterator[T](
private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
private[this] var havePair = false
private[this] var finished = false
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !rowReader.nextKeyValue
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
close()
}
havePair = !finished
}
!finished
}
override def next(): T = {
if (!hasNext) {
throw QueryExecutionErrors.endOfStreamError()
}
havePair = false
rowReader.getCurrentValue
}
override def map[B](f: (T) => B): Iterator[B] with Closeable =
new Iterator[B] with Closeable {
override def hasNext: Boolean = RecordReaderIterator.this.hasNext
override def next(): B = f(RecordReaderIterator.this.next())
override def close(): Unit = RecordReaderIterator.this.close()
}
override def close(): Unit = {
if (rowReader != null) {
try {
rowReader.close()
} finally {
rowReader = null
}
}
}
}
相关信息
相关文章
spark AggregatePushDownUtils 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦