gpt4 book ai didi

scala - 如何使用 `ssc.fileStream()` 读取 Parquet 文件?传递给 `ssc.fileStream()` 的类型是什么?

转载 作者:可可西里 更新时间:2023-11-01 14:16:29 26 4
gpt4 key购买 nike

我对Spark的理解fileStream()方法是将三种类型作为参数:Key , Value , 和 Format .对于文本文件,适当的类型是:LongWritable , Text , 和 TextInputFormat .

首先,我想了解这些类型的本质。凭直觉,我猜 Key在本例中是文件的行号,Value是那一行的文字。因此,在以下文本文件示例中:

Hello
Test
Another Test

DStream 的第一行会有一个 Key1 ( 0 ?)和一个 ValueHello .

这是正确的吗?


我的问题的第二部分:我查看了 ParquetInputFormat 的反编译实现我注意到一些奇怪的事情:

public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...

TextInputFormat延伸FileInputFormat类型 LongWritableText ,而 ParquetInputFormat扩展同一类类型 VoidT .

这是否意味着我必须创建一个 Value类来保存整行我的 Parquet 数据,然后传递类型 <Void, MyClass, ParquetInputFormat<MyClass>>ssc.fileStream()

如果是这样,我应该如何实现 MyClass


编辑 1:我注意到一个 readSupportClass这是要传递给ParquetInputFormat对象。这是一个什么样的类,它是如何用来解析parquet文件的?是否有一些文档涵盖了这一点?


编辑 2:据我所知,这是不可能的。如果有人知道如何将 Parquet 文件流式传输到 Spark,请随时分享...

最佳答案

下面是我在 Spark Streaming 中读取 Parquet 文件的示例。

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
println("row:" + row.toString())
row
})

有些点是...

  • 记录类型为GenericRecord
  • readSupportClass 是 AvroReadSupport
  • 将配置传递给 fileStream
  • 将 parquet.read.support.class 设置为配置

我引用了下面的源代码来创建示例。
而且我也找不到很好的例子。
我想等一个更好的。

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

关于scala - 如何使用 `ssc.fileStream()` 读取 Parquet 文件?传递给 `ssc.fileStream()` 的类型是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35413552/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com