gpt4 book ai didi

java - 如何从Kafka Topic获取记录总数并保存到HDFS中?

转载 作者:行者123 更新时间:2023-12-02 18:49:08 24 4
gpt4 key购买 nike

所有,

我正在处理从卡夫卡转储到HDFS中的数据。我能够使用数据,并希望从Kafka获取记录总数,并将其另存为文件到HDFS中,以便我可以将该文件用于验证。我可以在控制台中打印记录,但不确定如何创建总计数文件?

查询从卡夫卡提取记录:

Dataset ds1=ds.filter(args[5]);
StreamingQuery query = ds1
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", args[6] + "/checkpoints" + args[2])
.trigger(Trigger.Once())
.start();

try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}

以及我为获取记录并在控制台中打印而编写的代码:
Dataset stream=ds1.groupBy("<column_name>").count(); //实际上,我想在不使用GroupBy的情况下获得计数,我尝试过 long stream=ds1.count(),但遇到错误。
 StreamingQuery query1=stream.coalesce(1)
.writeStream()
.format("csv")
.option("path", path + "/record")
.start();

try {
query1.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}

这是行不通的,您能帮我解决这个问题吗?

最佳答案

主题中任何时间的记录数都是移动的目标。

您将需要使用旧的Spark Streaming查找每个Spark分区批处理的记录数,然后使用Accumulator来计数所有已处理的记录,但这将是您可以获得的最接近记录。

据称Spark + Kafka仅具有一次处理语义,因此我建议您专注于错误捕获和监视,而不仅仅是计数验证。

关于java - 如何从Kafka Topic获取记录总数并保存到HDFS中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61945286/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com