gpt4 book ai didi

scala - KafkaIO 检查点 - 如何向 Kafka 提交偏移量

转载 作者:行者123 更新时间:2023-12-04 18:57:39 24 4
gpt4 key购买 nike

我正在 Google Dataflow 中使用 Beam KafkaIO 源运行作业,但找不到一种简单的方法来在作业重新启动时保持偏移量(作业更新选项不够,我需要重新启 Action 业)

将 Beam 的 KafkaIO 与 PubSubIO 进行比较(或者准确地将 PubsubCheckpoint 与 KafkaCheckpointMark 进行比较),我可以看到 KafkaIO 中没有实现检查点持久性(KafkaCheckpointMark.finalizeCheckpoint 方法为空),而它是在 PubsubCheckpoint.finalizeCheckpoint 中实现的,它对 PubSub 进行确认。

这是否意味着我无法以最少的努力可靠地管理作业重新启动时的 Kafka 偏移量?

到目前为止我考虑过的选项:

  • 实现我自己的用于持久偏移的逻辑 - 听起来很复杂,我在 Scala 中通过 Scio 使用 Beam。
  • 什么都不做,否则会导致作业重新启动时出现许多重复(主题有 30 天的保留期)。
  • 启用自动提交,但这会导致消息丢失,甚至更糟。
  • 最佳答案

    有两个选项:启用 commitOffsetsInFinalize()在 KafkaIO 中或在 Kafka 消费者配置中启用自动提交。请注意,虽然 commitOffsetsInFinalize()与 Kafka 的自动提交相比,它与 Beam 中处理的内容更同步,它不提供严格的一次性处理保证。想象一个两阶段的管道,Dataflow 在第一阶段之后完成 Kafka 读取器,而无需等待第二阶段完成。如果您当时从头开始重新启动管道,您将不会处理完成第一阶段但尚未被第二阶段处理的记录。 PubsubIO 的问题没有什么不同。

    Regd 选项 (2) :您可以将 KafkaIO 配置为从特定时间戳开始读取(假设 Kafka 服务器支持它(版本 10+))。但看起来并不比启用 auto_commit 更好。

    也就是说,KafkaIO 应该支持 finalize。使用起来可能比启用 auto_commit 更简单(需要考虑频率等)。我们没有多少用户要求它。如果可以,请在 user@beam.apache.org 上提及。

    [更新:我在 PR 4481 中添加了对向 KafkaCheckpointMark 提交偏移量的支持]

    关于scala - KafkaIO 检查点 - 如何向 Kafka 提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48406321/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com