gpt4 book ai didi

apache-kafka - 当 KTable 中缺少 key 时,处理 KStream 与 KTable 的连接

转载 作者:行者123 更新时间:2023-12-04 03:13:19 26 4
gpt4 key购买 nike

我最近开始试验 kafka 流。我有一个场景需要加入 KStreamKTable . KTable 可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException .

特别是我得到

stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException



我不知道我该如何处理。我无法以某种方式过滤掉与表条目不对应的流记录。

更新

进一步观察,我发现我可以通过 ReadOnlyKeyValueStore 查询底层存储以查找键是否存在。界面。

在这种情况下,我的问题是,这是最好的方法吗?即根据本地存储中是否存在键来过滤要加入的流?

在这种情况下,我的第二个问题是,因为我关心利用 Global State Store在版本中引入 10.2在下一阶段,我是否应该期望我也能够以相同的方式查询 Global State Store ?

更新

之前的更新不准确,因为不可能从拓扑内部查询状态存储

最终更新

在更好地理解连接语义后,我能够解决这个问题,只是简化了 valueJoiner只返回结果,而不是对连接的值执行操作,并在连接后添加额外的过滤步骤以过滤掉空值。

最佳答案

我的问题的解决方案来自于了解 join语义好一点。

就像在数据库连接中一样(虽然我并不是说 Kstream 连接精确地遵循 db 连接概念),左连接操作会在缺少右侧键的地方产生具有空值的行。

所以最终我唯一要做的就是解耦我的 valueJoiner从随后的计算/操作(我需要对连接记录的字段执行一些计算并返回一个新构造的对象)并让它只返回连接值的数组。然后我可以过滤掉导致 null 的记录。通过检查这些数组来获取值。

根据 Matthias 的 J. Sax 建议,我使用了 0.10.2版本而不是 0.10.1兼容代理版本 0.10.1并更换整个leftJoin带有内连接的逻辑,无需过滤 null值。

关于apache-kafka - 当 KTable 中缺少 key 时,处理 KStream 与 KTable 的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43142081/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com