gpt4 book ai didi

apache-kafka - 卡夫卡流 : How to ensure offset is committed after processing is completed

转载 作者:行者123 更新时间:2023-12-04 01:37:59 26 4
gpt4 key购买 nike

我想使用 Kafka 流处理存在于 Kafka 主题中的消息。

处理的最后一步是将结果放入数据库表中。为了避免与数据库争用相关的问题(该程序将运行 24*7 并处理数百万条消息),我将对 JDBC 调用使用批处理。

但是在这种情况下,消息有可能丢失(在一个场景中,我从一个主题中读取了 500 条消息,流将标记偏移量,现在程序失败。JDBC 批量更新中存在的消息丢失,但偏移量标记为那些消息)。

我想在数据库插入/更新完成后手动标记最后一条消息的偏移量,但根据以下问题是不可能的:How to commit manually with Kafka Stream? .

有人可以建议任何可能的解决方案吗

最佳答案

正如@sun007 的回答中所提到的,我宁愿稍微改变你的方法:

  • 使用 Kafka Streams 处理输入数据。让 Kafka Streams 应用程序将其输出写入 Kafka,而不是关系数据库。
  • 使用 Kafka Connect(例如,随时可用的 JDBC 连接器)将数据从 Kafka 摄取到关系数据库。根据需要配置和调整连接器,例如用于批量插入数据库。

  • 的这种解耦加工 (Kafka Streams) 和 摄取 (Kafka Connect) 通常是更可取的设计。例如,您不再将处理步骤与数据库的可用性结合起来:如果数据库关闭,您的 KStreams 应用程序为什么要停止?这是一个与处理逻辑无关的操作问题,您当然不想处理超时、重试等问题。 (即使您使用 Kafka Streams 以外的工具进行处理,这种解耦仍然是一个更可取的设置。)

    关于apache-kafka - 卡夫卡流 : How to ensure offset is committed after processing is completed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58819995/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com