gpt4 book ai didi

java - 为什么我看不到 Kafka Streams reduce 方法的任何输出?

转载 作者:搜寻专家 更新时间:2023-10-31 19:37:09 25 4
gpt4 key购买 nike

给定以下代码:

KStream<String, Custom> stream =  
builder.stream(Serdes.String(), customSerde, "test_in");

stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);

我在 Reducer 的 apply 方法中有一个 println 语句,当我期望减少发生时它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用 to 方法而不是 print,我在目标主题中看不到任何消息。

在 reduce 语句之后我需要什么才能看到 reduce 的结果?如果一个值被推送到输入,我不希望看到任何东西。如果推送具有相同键的第二个值,我希望 reducer 应用(它确实如此)并且我还希望减少的结果继续到处理管道中的下一步。如上所述,我在管道的后续步骤中没有看到任何东西,我不明白为什么。

最佳答案

作为卡夫卡 0.10.1.0所有聚合运算符都使用内部重复数据删除缓存来减少结果 KTable 变更日志流的负载。例如,如果您直接计数并处理具有相同键的两条记录,则完整的变更日志流将为 <key:1>, <key:2> .

使用新的缓存功能,缓存将收到 <key:1>并存储它,但不会立即将其发送到下游。当<key:2>计算后,它会替换缓存的第一个条目。根据缓存大小、不同键的数量、吞吐量和您的提交间隔,缓存向下游发送条目。这发生在单个键条目的缓存逐出或完全刷新缓存(向下游发送所有条目)时。因此,KTable 更新日志可能只显示 <key:2> (因为 <key:1> 进行了去重)。

您可以通过Streams 配置参数控制缓存的大小StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG .如果将该值设置为零,则完全禁用缓存并且 KTable 更改日志将包含所有更新(有效地提供前 0.10.0.0 行为)。

Confluent 文档中有一节更详细地解释了缓存:

关于java - 为什么我看不到 Kafka Streams reduce 方法的任何输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40537084/

25 4 0