gpt4 book ai didi

java - Kafka Streams KTable 外键连接无法按预期工作

转载 作者:行者123 更新时间:2023-12-04 14:48:12 30 4
gpt4 key购买 nike

我正在尝试在 Kafka Streams 中加入一个类似于许多文章的简单外键(例如: https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/ )。
当我尝试加入用户时 id (用户表的主键)与外键 user_idaccount_balance表生产AccountRecord对象,我收到以下错误:[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.目标是最终交付 AccountRecord每次表中的任何字段更新时都指向一个主题。问题是,当我简单地分别打印用户表和帐户表时,外键和所有字段都被完全填充。我看不出有什么问题或为什么会发生此错误。这是我的代码片段:

    public void start_test(){
StreamsBuilder builder = new StreamsBuilder();

KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));

final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);

// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());

KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
任何指导都会有所帮助。我没有包含自定义 serde 代码或对象形状,但它们非常简单。如果您需要额外说明,请告诉我。
谢谢

最佳答案

您的消息是否包含关键记录? KTable 是对 changelog 流的抽象,其中每条数据记录代表一个更新,知道更新的方法是使用 key,对于 KTables 工作的此刻记录的 key 是非常重要的。
例如

AccountBalance<Key=10,Value={accountBalanceId=10,userId=777,balance=10}>
User<Key=777, Value={firstName="Panchito"}>
另一个观察结果是您的 Serde 键,如果您将 Long 定义为键,为什么要使用自定义 serde?
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.User()));

KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(Serdes.Long(), CustomSerdes.AccountBalance()))
也许您的 key 解串器将 key 发送为空值。检查您的自定义 Serde 的输出记录输出。
此外,您必须改进添加物化的 join 方法,因为您正在创建一个新对象,而 Kafka 不知道如何处理新对象。
      final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance()),
Materialized.with(Serdes.Long(), CustomSerdes.AccountBalanceSerde() )
);
尝试使用 JsonSerde 或 Avro 来创建您的自定义 Serdes。

关于java - Kafka Streams KTable 外键连接无法按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69589123/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com