gpt4 book ai didi

apache-kafka-streams - Tombstone 消息没有从 KTable 状态存储中删除记录?

转载 作者:行者123 更新时间:2023-12-04 08:21:31 25 4
gpt4 key购买 nike

我正在从 KStream 创建 KTable 处理数据。但是,当我使用键和空负载触发墓碑消息时,它不会从 KTable 中删除消息。

样本 -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

触发具有空值的消息后,我可以在具有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志“test-store”上的记录。看起来它甚至没有达到 reduce 方法,不确定我在这里缺少什么。

感谢您对此的任何帮助!

谢谢。

最佳答案

reduce() 的 JavaDocs 中所述

Records with {@code null} key or value are ignored.



因为, <key,null>记录被删除,因此 (genericRecord, v1) -> v1永远不会执行,不会将逻辑删除写入存储或更改日志主题。

对于您想到的用例,您需要使用指示“删除”的代理值,例如 Avro 记录中的 bool 标志。您的 reduce 函数需要检查标志并返回 null如果设置了标志;否则,它必须定期处理记录。

更新:

Apache Kafka 2.6 添加了 KStream#toTable()运算符(通过 KIP-523 )允许转换 KStreamKTable .

关于apache-kafka-streams - Tombstone 消息没有从 KTable 状态存储中删除记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50708252/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com