gpt4 book ai didi

scala - 我可以从外部方法遍历 KTable 中的项目吗

转载 作者:行者123 更新时间:2023-12-04 21:05:45 26 4
gpt4 key购买 nike

我有一个 kafka 主题和一个听它的 KTable。

我想写一个 http POST 请求,它将遍历 ktable 中的当前项目,对它们执行一些操作并写回主题

所以基本上我有:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()

....
override def refreshTokens = {

accessTokenTable.mapValues {
new ValueMapper[String, String] {
override def apply(value: String) = {
value
}
}
}.print(token_topic_name)
}

当我尝试调用此方法时,没有任何内容打印/写入主题

我错过了什么?我唯一的选择是将消息从 ktable 写入 hashmap 并从那里读取吗?它错过了ktables的全部意义?

最佳答案

the correct solution is to use GlobalKTable to avoid "the state store may have migrated to another instance" errors as discussed here.



由于您回答了自己的问题,并且显然在后续跟进中遇到了另一个问题,让我扩展一下您在回答中所说的内容,以帮助该问题线程的其他读者。
  • 如果您使用的是 KTable (这是分区 = KTable 的每个“实例”只能看到总表数据的一部分)通常,您需要做的是防范此异常并重试。想一想:尝试-捕获-重试。
  • 如果您使用的是 GlobalKTable ,那么您就回避了这个问题,因为 GlobalKTable 的每个实例都有整个表数据的完整副本。

  • 注意:通常,您不会在 KTable 与 GlobalKTable 之间做出决定,因为您想防止“状态存储可能已迁移”的情况,而是因为这两种抽象为您的应用程序提供了不同的语义。例如,使用 KTable 而不是 GlobalKTable 有很多很好的理由——如果你这样做了,你只需要知道我们刚刚在这里讨论的内容(文档中也有介绍,但显然不明显/考虑到您确实遇到了这个问题,这已经足够清楚了)。

    希望这可以帮助!

    关于scala - 我可以从外部方法遍历 KTable 中的项目吗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46296241/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com