gpt4 book ai didi

apache-spark - foreachBatches 在来自多个 Kafka 主题的流式查询中包含什么?

转载 作者:行者123 更新时间:2023-12-02 12:09:18 25 4
gpt4 key购买 nike

给定一个 DataStreamReader 配置为订阅多个主题,如下所示(请参阅 here ):

// Subscribe to multiple topics
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")

当我在此之上使用 foreachBatch 时,批处理将包含什么?

  • 每一批处理仅包含来自一个主题的消息?
  • 或者一个批处理可以包含来自不同主题的消息吗?

在我的用例中,我希望批量处理仅来自一个主题的消息。可以这样配置吗?

最佳答案

引用Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)中的官方文档:

// Subscribe to multiple topics

...
.option("subscribe", "topic1,topic2")

上面的代码是底层 Kafka 消费者(流查询)订阅的代码。

When I use foreachBatch on top of this, what will the batches contain?

  • Each batch will only contain messages from one topic?

这就是正确的答案。

I'd like to have batches with messages coming from one topic only. Is it possible to configure this?

这也记录在 Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) 中:

Each row in the source has the following schema:

...

topic

换句话说,输入数据集将具有 topic 列,其中包含给定行(记录)来自的主题名称。

为了让“一批消息仅来自一个主题”,您只需使用一个主题过滤where,例如

val messages: DataFrame = ...
assert(messages.isStreaming)

messages
.writeStream
.foreachBatch { case (df, batchId) =>
val topic1Only = df.where($"topic" === "topic1")
val topic2Only = df.where($"topic" === "topic2")
...
}

关于apache-spark - foreachBatches 在来自多个 Kafka 主题的流式查询中包含什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56989068/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com