gpt4 book ai didi

apache-kafka-streams - 在kafka流中关闭窗口时如何发送有关主题的记录

转载 作者:行者123 更新时间:2023-12-01 10:23:17 27 4
gpt4 key购买 nike

实际上,我已经为此苦苦挣扎了几天。我正在使用来自 4 个主题的记录。我需要通过 TimedWindow 聚合记录。时间到了,我想向接收器主题发送批准的消息或未批准的消息。这可能与 kafka 流有关吗?

它似乎将每条记录都放入新主题,即使窗口仍然打开,这真的不是我想要的。

下面是简单的代码:

 builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), 
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))

.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))

.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));

到目前为止,每条记录都被下沉到outgoingTopic,我只希望它在窗口关闭时发送一条消息,可以这么说。

这可能吗?

最佳答案

如果其他人需要答案,我会回答我自己的问题。在转换阶段,我使用上下文来创建调度程序。该调度程序采用三个参数。标点的间隔,使用的时间(挂钟或流时间)和供应商(满足时间时调用的方法)。我使用挂钟时间并为每个唯一的窗口键启动了一个新的调度程序。我在 KeyValue 存储中添加每条消息并返回 null。然后,在每 30 秒调用一次的方法中,我检查窗口是否关闭,并遍历 keystore 中的消息,聚合并使用 context.forward 和 context.commit。中提琴!在 30 秒的窗口中收到 4 条消息,产生了 1 条消息。

关于apache-kafka-streams - 在kafka流中关闭窗口时如何发送有关主题的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49027787/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com