gpt4 book ai didi

apache-flink - 流式传输到 parquet 文件对 flink 1.6.1 不满意

转载 作者:行者123 更新时间:2023-12-01 22:30:59 25 4
gpt4 key购买 nike

我对 flink(以及 parquet/hadoop)非常陌生,所以我肯定在做一些非常愚蠢的事情。我正在尝试创建一个接收器,它将我的数据源转储到 Parquet 文件中。

我的代码如下:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);

val sink = StreamingFileSink.forBulkFormat(outputPath, ParquetAvroWriters.forReflectRecord(classOf[MyClass])).build()
testSource.addSink(sink)

不幸的是,我没有得到之前的异常,但它仍然没有生成正确的输出。我目前正在获取一个 .part-xxx 文件,其中包含 4B 数据。此流中大约有 20,000 条记录,因此这似乎不对。

在我开始写这个问题之前,我在 ParquetAvroWriters.java 第 84 行收到了一个未找到方法的异常。

该代码如下所示:

return AvroParquetWriter.<T>builder(out)
.withSchema(schema)
.withDataModel(dataModel)
.build();

AvroParquetWriter 方法签名是:

public static <T> Builder<T> builder(Path file)

但是 ParquetAvroWriters.java 调用它时的参数是 StreamOutputFile,因此出现 no method 错误。

我正在使用 flink 1.6.1 和 parquet-hadoop/parquet-avro 1.10.0。我到底应该如何设置来编写 Parquet 文件?

这真是令人沮丧 - 我什至找不到可以编译的示例。

最佳答案

读完人们的评论后,我创建了一个具有相同代码(类似)的项目,但你可以编译并执行。

object CustomSource {

case class TextOut(data:String )

def generateRandomStringSource(out: SourceContext[TextOut]) = {
val lines = Array("how are you", "you are how", " i am fine")
while (true) {
val index = Random.nextInt(3)
Thread.sleep(200)
out.collect(TextOut(lines(index)))
}
}

def main(args: Array[String]) {
val streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment

streamEnv.setParallelism(1)
streamEnv.enableCheckpointing(10000,
CheckpointingMode.EXACTLY_ONCE)
val sink = StreamingFileSink.forBulkFormat(new
Path("file:///tmp/test2"),
ParquetAvroWriters.forReflectRecord(classOf[TextOut])).build()

val customSource = streamEnv.addSource(generateRandomStringSource
_)

customSource.print()

customSource.addSink(sink)

streamEnv.execute()
}
}

我创建了一个项目来展示其运行方式以及所需的最少内容(jar 等)。

这是链接:https://github.com/jose1003/flinkparquet

BR

何塞

关于apache-flink - 流式传输到 parquet 文件对 flink 1.6.1 不满意,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52638193/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com