gpt4 book ai didi

java - Kafka Streams 在列表中收集流项目

转载 作者:行者123 更新时间:2023-11-30 10:16:30 25 4
gpt4 key购买 nike

我想在给定时间段或处理完 1000 个项目后,部分收集 Kafka Streams 列表中的项目。

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> kStream = builder.stream(topicTwo);
KTable<String, String> kTable = builder.table(topicOne);

kStream.join(kTable,
(streamValue, tableValue) -> new CustomObject(streamValue, tableValue)
.foreach((key, value) -> System.out.println(value));

KafkaStreams streams = new KafkaStreams(builder, streamProperties);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

这是我的代码。我不知道我说得够不够清楚,但我想要的是 List<CustomObject>每处理 1000 个项目或 5 秒后。这可能吗?

最佳答案

为此,我认为您需要定义自定义 Transformer .

在其transform方法,将消息追加到列表中。如果列表大小达到 1000 个元素,则创建一个新列表并返回旧列表。

在其init方法 schedule ProcessorContext 上的标点符号函数,它发出比您的时间窗口更早的元素列表。

使用transform加入后 KStream 上的方法将您的自定义转换器添加到您的拓扑中。

在您的转换器中,最好将缓冲元素保存在 StateStore 中。 ,而不是在内存列表中,以确保在故障转移/重新平衡时不会丢失任何消息。 StateStores 在 Kafka 中备份。

关于java - Kafka Streams 在列表中收集流项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50037038/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com