gpt4 book ai didi

hadoop - Flink 1.6存储在.in-progress中的接收器HDFS文件

转载 作者:行者123 更新时间:2023-12-02 19:44:16 25 4
gpt4 key购买 nike

我正在将Kafka数据流写入HDFS路径中的存储区。 Kafka给出字符串数据。使用FlinkKafkaConsumer010从Kafka消费

-rw-r--r--   3 ubuntu supergroup    4097694 2018-10-19 19:16 /streaming/2018-10-19--19/_part-0-1.in-progress
-rw-r--r-- 3 ubuntu supergroup 3890083 2018-10-19 19:16 /streaming/2018-10-19--19/_part-1-1.in-progress
-rw-r--r-- 3 ubuntu supergroup 3910767 2018-10-19 19:16 /streaming/2018-10-19--19/_part-2-1.in-progress
-rw-r--r-- 3 ubuntu supergroup 4053052 2018-10-19 19:16 /streaming/2018-10-19--19/_part-3-1.in-progress

仅当我使用某些映射功能来动态操纵流数据时,才会发生这种情况。如果我直接将流写入HDFS,则可以正常工作。知道为什么会这样吗?我正在使用Flink 1.6.1,Hadoop 3.1.1和Oracle JDK1.8

最佳答案

这个问题有点晚了,但是我也遇到了类似的问题。
我有一个案例类地址

case class Address(val i: Int)

例如,我从集合中读取了带有地址数的来源
    env.fromCollection(Seq(new Address(...), ...)) 

...
val customAvroFileSink = StreamingFileSink
.forBulkFormat(
new Path("/tmp/data/"),
ParquetAvroWriters.forReflectRecord(classOf[Address]))
.build()
...
xxx.addSink(customAvroFileSink)

启用检查点后,我的 Parquet 文件也将以进行中的状态结束

我发现Flink在触发检查点之前完成了该过程,因此我的结果从未完全刷新到磁盘上。在将检查点间隔更改为较小的数字后, Parquet 不再进行中。

关于hadoop - Flink 1.6存储在.in-progress中的接收器HDFS文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52898825/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com