gpt4 book ai didi

apache-kafka - 在 SinkTask 中完成 "commit offsets"后立即使用 Kafka Connect HOWTO "put"

转载 作者:行者123 更新时间:2023-12-03 09:53:07 25 4
gpt4 key购买 nike

我正在使用 Kafka Connect 从 Kafka Broker (v0.10.2) 获取消息,然后将其同步到下游服务。

目前,我在 SinkTask#put 中有代码将处理 SinkRecord & 然后将其持久化到下游服务。

几个关键要求,

  • 我们需要确保消息至少持久化到下游服务一次。
  • 如果下游服务抛出错误或说它没有处理消息,那么我们需要确保再次重新读取消息。

  • 所以我们认为我们可以依靠 SinkTask#flush通过抛出异常或告诉 Connect 不要提交偏移量,而是在下一次轮询中重试,有效地退出为接收到的消息的特定轮询/周期提交偏移量。

    但正如我们发现的 flush实际上是基于时间的 & 或多或少独立于民意调查 & 当它达到某个时间阈值时,它将提交偏移量。

    在 0.10.2 SinkTask#preCommit被引入,所以我们认为我们可以将它用于我们的目的。但文档中没有提到 SinkTask#put 之间存在 1:1 的关系。 & SinkTask#preCommit .

    因为本质上我们想要 commit offsets尽快下单 put succeeds .同样, 不是 提交偏移量,如果是特定的 put失败的。

    如果不是通过 SinkTask#preCommit 如何做到这一点?

    最佳答案

    正确地将数据传入和传出 Kafka 可能具有挑战性,而 Kafka Connect 使这变得更容易,因为它使用最佳实践并隐藏了许多复杂性。对于接收器连接器,Kafka Connect 从主题读取消息,将它们发送到您的连接器,然后定期提交已读取和处理的各种主题分区的最大偏移量。

    请注意,“将它们发送到您的连接器”对应于 put(Collection<SinkRecord>)方法,并且在 Kafka Connect 提交偏移量之前可能会多次调用此方法。您可以控制 Kafka Connect 提交偏移量的频率,但 Kafka Connect 确保它只会在连接器成功处理该消息时提交该消息的偏移量。

    当连接器正常运行时,一切都很好,即使定期提交偏移量,您的连接器也会看到每条消息一次。但是,如果连接器失败,那么当它重新启动时,连接器将从上次提交的偏移量开始。这可能意味着您的连接器会看到它在崩溃前处理的一些相同的消息。如果您仔细编写连接器以使其至少具有一次语义,这通常不是问题。

    为什么 Kafka Connect 会定期提交偏移量而不是每条记录?因为它可以节省大量工作,并且在名义上进行时并不重要。只有当出现问题时,偏移滞后才重要。即便如此,如果您让 Kafka Connect 处理偏移量,您的连接器至少需要准备好处理一次消息。恰好一次是可能的,但您的连接器必须做更多的工作(见下文)。

    写作记录

    您在编写连接器时有很大的灵活性,这很好,因为在很大程度上取决于它所写入的外部系统的功能。让我们看看不同的实现方式 putflush .

    如果系统支持事务或可以处理一批更新,您的连接器的 put(Collection<SinkRecord>)可以使用单个事务/批处理写入该集合中的所有记录,根据需要重试多次,直到事务/批处理完成或最终抛出错误之前。在这种情况下,put做所有的工作,要么成功,要么失败。如果成功,那么 Kafka Connect 知道所有记录都得到了正确处理,因此可以(在某个时候)提交偏移量。如果您的 put调用失败,然后 Kafka Connect 假定不知道是否处理了任何记录,因此它不会更新其偏移量并停止您的连接器。您的连接器 flush(...)什么都不用做,因为 Kafka Connect 正在处理所有的偏移量。

    如果系统不支持事务,而您一次只能提交一个项目,则您可能拥有连接器的 put(Collection<SinkRecord>)尝试单独写出每个记录,阻塞直到它成功并在抛出错误之前根据需要重试每个记录。再次,put完成所有工作,flush方法可能不需要做任何事情。

    到目前为止,我的示例完成了 put 中的所有工作。 .您始终可以选择拥有 put简单地缓冲记录并改为在 flush 中完成写入外部服务的所有工作或 preCommit .您可能这样做的原因之一是您的写入是基于时间的,就像 flush 一样。和 preCommit .如果您不希望您的写入是基于时间的,您可能不想在 flush 中执行写入操作。或 preCommit .

    记录偏移或不记录

    如上所述,默认情况下 Kafka Connect 会定期记录偏移量,以便在重新启动时连接器可以从上次停止的地方开始。

    但是,有时需要连接器记录外部系统中的偏移量,尤其是当可以原子方式完成时。当这样的连接器启动时,它可以查看外部系统以找出最后写入的偏移量,然后可以告诉 Kafka Connect 它想要从哪里开始读取。使用这种方法,您的连接器可以只处理一次消息。

    当接收器连接器这样做时,它们实际上根本不需要 Kafka Connect 来提交任何偏移量。 flush方法只是让连接器知道 Kafka Connect 正在为您提交哪些偏移量的机会,并且由于它不返回任何内容,因此无法修改这些偏移量或告诉 Kafka Connect 连接器正在处理哪些偏移量。

    这是preCommit的地方方法进来了,真的是flush的替代品(它实际上采用与 flush 相同的参数),除了预计返回 Kafka Connect 应该提交的偏移量。默认情况下,preCommit只需拨打 flush然后返回传递给 preCommit 的相同偏移量,这意味着 Kafka Connect 应该提交它通过 preCommit 传递给连接器的所有偏移量。 .但是如果您的 preCommit返回一组空的偏移量,然后 Kafka Connect 将根本不记录任何偏移量。

    因此,如果您的连接器要处理外部系统中的所有偏移并且不需要 Kafka Connect 来记录任何内容,那么您应该覆盖 preCommit方法而不是 flush ,并返回一组空的偏移量。

    关于apache-kafka - 在 SinkTask 中完成 "commit offsets"后立即使用 Kafka Connect HOWTO "put",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45200585/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com