gpt4 book ai didi

java - Spark 流 : Writing number of rows read from a Kafka topic

转载 作者:行者123 更新时间:2023-11-29 04:14:30 25 4
gpt4 key购买 nike

Spark 流作业正在从繁忙的 kafka 主题中读取事件。为了了解每个触发间隔有多少数据进入,我只想输出从主题读取的行数。我尝试了多种方法来做到这一点,但无法弄清楚。

Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
// .option("failOnDataLoss", false)
// .option("maxOffsetsPerTrigger", 10000)
.load();
stream.selectExpr("topic").agg(count("topic")).as("count");
//stream.selectExpr("topic").groupBy("topic").agg(count(col("topic")).as("count"));
stream.writeStream()
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start();

最佳答案

看来你需要

stream = stream.selectExpr("topic").agg(count("topic")).as("count");

然后你就可以打印了

关于java - Spark 流 : Writing number of rows read from a Kafka topic,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53227798/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com