gpt4 book ai didi

apache-kafka-streams - 根据部分数据属性更新KTable

转载 作者:行者123 更新时间:2023-12-04 10:58:56 27 4
gpt4 key购买 nike

我正在尝试使用对象的部分数据更新 KTable。
例如。用户对象是{"id":1, "name":"Joe", "age":28}对象被流式传输到一个主题中,并按 key 分组到 KTable 中。
现在用户对象部分更新如下{"id":1, "age":33}并流入表中。但更新后的表格如下 {"id":1, "name":null, "age":28} .
预期输出为 {"id":1, "name":"Joe", "age":33} .
如何使用 Kafka 流和 Spring Cloud 流来实现预期的输出。任何建议,将不胜感激。谢谢。

这是代码

 @Bean
public Function<KStream<String, User>, KStream<String, User>> process() {
return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
user1.merge(user2);
return user1;
}, Materialized.as("allusers")).toStream();
}

并使用以下代码修改了 User 对象:
    public void merge(Object newObject) {
assert this.getClass().getName().equals(newObject.getClass().getName());
for (Field field : this.getClass().getDeclaredFields()) {
for (Field newField : newObject.getClass().getDeclaredFields()) {
if (field.getName().equals(newField.getName())) {
try {
field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
} catch (IllegalAccessException ignore) {
}
}
}
}
}

这是正确的方法还是 KStreams 中的任何其他方法?

最佳答案

我已经测试了您的合并代码,它似乎按预期工作。但是因为你的结果在reduce之后是 {"id":1, "name":null, "age":28} ,我能想到两件事:

  • 您的状态根本没有更新,因为没有任何属性发生变化。
  • 也许您有序列化问题,因为字符串属性为空,但其他 int 属性没问题。

  • 我的猜测是,因为您正在改变原始对象并返回相同的值,所以 kafka 流不会将其检测为更改并且不会存储新状态。实际上,您不应该改变您的对象,因为它可能会导致不确定性,具体取决于您的管道。

    尝试更改您的 merge创建新的函数 User对象,并查看行为是否发生变化。

    关于apache-kafka-streams - 根据部分数据属性更新KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58960806/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com