gpt4 book ai didi

java - 删除下游 ChangeLog 对象 KafkaStreams

转载 作者:行者123 更新时间:2023-12-01 20:03:40 27 4
gpt4 key购买 nike

我正在尝试删除下游变更日志中值为 null 的记录,我知道在状态存储中它们只是通过为 null(逻辑删除)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不删除它。我需要想办法在聚合中设置删除标志,让 Kafka 知道可以删除记录。这是我的代码:

   public void deleteByEntity(String inputTopic, String target, String stateStoreName) {

// Need to set property to true in application.properties
// if ("true".equals(utils.getProperty(ApplicationConfigs.KAFKA_DELETE_BY_ENTITY))) {
Materialized<String, String, KeyValueStore<Bytes, byte[]>> storeName =
Materialized.as(stateStoreName);

StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> docStream = streamsBuilder.stream(inputTopic);

KTable<?, ?> dataInTable =
docStream
.groupByKey()
.reduce(
(value1, value2) -> {
// System.out.println("aa");
if (value1.equals(target)) {
// If key equals target topic return null, creates tombstone deletes from
// statestore, sends null record downstream
return null;
}
return value2;
},
storeName);
// System.out.println(dataInTable);
}

谢谢

最佳答案

如果您return null来自您的Reducer它将从存储中删除数据并且它将发送相应的输出记录 <key,null> 。因此,不需要下游处理。

请注意null键和null仅忽略 reduce()输入记录的值.

关于java - 删除下游 ChangeLog 对象 KafkaStreams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58997293/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com