gpt4 book ai didi

java - Kafka Streams SessionWindows 中的节流

转载 作者:行者123 更新时间:2023-11-30 05:49:42 27 4
gpt4 key购买 nike

默认情况下,.windowsedBy(SessionWindows.with(...)) 将返回每个新的传入记录。那么,我如何才能等待至少 1 秒才能返回当前 session 窗口的最后一个结果?

我正在尝试使用字数统计示例:

        final KStream<String, String> source = builder.stream("streams-plaintext-input");

final KStream<String, Long> wordCounts = source

// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))

// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)

.windowedBy(SessionWindows.with(Duration.ofSeconds(10)))

// Count the occurrences of each word (message key).
.count(Materialized.with(Serdes.String(), Serdes.Long()))

.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), Suppressed.BufferConfig.unbounded()))

// Convert to KStream<String, Long>
.toStream((windowedId, count) -> windowedId.key());

wordCounts.foreach((word, count) -> {
System.out.println(word + " : " + count);
});

这是producer的input和client的result,其实是错误的:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka stream

(nothing)

>hello kafka stream

hello : 1
kafka : 1
stream : 1

>hello kafka stream

hello : null
kafka : 1
stream : 1

我该如何解决?非常感谢阅读我的问题:)

最佳答案

关于java - Kafka Streams SessionWindows 中的节流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56680361/

27 4 0