gpt4 book ai didi

apache-kafka - 什么时候使用 Kafka 事务 API?

转载 作者:行者123 更新时间:2023-12-04 17:32:24 25 4
gpt4 key购买 nike

我试图理解 Kafka 的事务 API。 This link定义原子读-处理-写循环如下:

First, let’s consider what an atomic read-process-write cycle means. In a nutshell, it means that if an application consumes a message A at offset X of some topic-partition tp0, and writes message B to topic-partition tp1 after doing some processing on message A such that B = F(A), then the read-process-write cycle is atomic only if messages A and B are considered successfully consumed and published together, or not at all.



它进一步说如下:

Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

  1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

  2. We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

  3. Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.”

We designed transaction APIs in Kafka to solve the second and third problems. Transactions enable exactly-once processing in read-process-write cycles by making these cycles atomic and by facilitating zombie fencing.



疑问:
  • 上面的第 2 点和第 3 点描述了何时会发生消息重复,这是使用事务 API 处理的。事务 API 是否也有助于在任何情况下避免消息丢失?
  • 大多数在线(例如, herehere )Kafka 事务 API 示例涉及:
    while (true)
    {
    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
    producer.beginTransaction();
    for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
    producer.commitTransaction();
    }

    这基本上是读-处理-写循环。那么事务 API 仅在读-处理-写循环中有用吗?
  • This文章给出了非读-处理-写场景中的事务API示例:
     producer.initTransactions();
    try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
    } catch(ProducerFencedException e) {
    producer.close();
    } catch(KafkaException e) {
    producer.abortTransaction();
    }

    它说:

    This allows a producer to send a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers.



    这个例子是否正确并展示了另一种使用不同于读取-处理-写入循环的事务 API 的方法? (请注意,它也不会向事务提交偏移量。)
  • 在我的应用程序中,我只是使用来自 kafka 的消息,进行处理并将它们记录到数据库中。这就是我的整个管道。

    一个。所以,我猜这不是读-处理-写循环。 Kafka 事务 API 对我的场景有任何用处吗?

    湾此外,我需要确保每条消息都只处理一次。我猜设置 idempotent=true在生产者中就足够了,我不需要事务 API,对吗?

    C。我可能会运行多个管道实例,但我不会将处理输出写入 Kafka。所以我想这永远不会涉及僵尸(重复的生产者写给 kafka)。所以,我猜事务 API 不会帮助我避免重复处理场景,对吗? (我可能必须在同一个数据库事务中将偏移量与处理输出一起保存到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)
  • 最佳答案

    a. So, I guess this is not read-process-write cycle. Is Kafka transactional API of any use to my scenario?



    它是一个读-写-写,除了你写的是数据库而不是 Kafka。 Kafka 有自己的事务管理器,因此在具有幂等性的事务中写入将启用一次处理,假设您可以正确恢复消费者写入处理器的状态。你不能用数据库来做到这一点,因为数据库的事务管理器不与 Kafka 的同步。你可以做的是确保即使 kafka 事务相对于你的数据库不是原子的,它们仍然最终是一致的。

    让我们假设您的消费者读取、写入数据库然后确认。如果数据库失败,您不会确认,您可以根据偏移量正常恢复。如果确认失败,您将处理两次并保存到数据库两次。如果您可以使此操作具有幂等性,那么您就是安全的。这意味着您的处理器必须是纯处理器并且 DB 必须进行重复数据删除:处理相同的消息两次应该总是导致 DB 上的相同结果。

    b. Also I need to ensure that each message is processed exactly once. I guess setting idempotent=true in producer will suffice and I dont need transactional API, right?



    假设您尊重 a 点的要求,在不同的存储上使用持久性处理恰好一次还要求在您的初始写入和复制之间,您正在保存的对象没有发生其他更改。想象一下,将一个值写为 X,然后其他一些参与者将其更改为 Y,然后重新处理消息并将其更改回 X。例如,可以通过将数据库表设为日志来避免这种情况,类似于 kafka 主题.

    C。我可能会运行多个管道实例,但我不会将处理输出写入 Kafka。所以我想这永远不会涉及僵尸(重复的生产者写给 kafka)。所以,我猜事务 API 不会帮助我避免重复处理场景,对吗? (我可能必须在同一个数据库事务中将偏移量与处理输出一起保存到数据库中,并在生产者重启期间读取偏移量以避免重复处理。)

    写入您从中消费的主题的生产者可能会创建僵尸消息。该生产者需要与 kafka 相处得很好,以便忽略僵尸。事务 API 与您的消费者一起将确保此生产者以原子方式写入并且您的消费者读取已提交的消息,尽管不是以原子方式。如果你想要一次,幂等性就足够了。如果消息应该以原子方式写入,则您也需要事务。无论哪种方式,您的读写/消费-生产处理器都需要是纯粹的,并且您必须进行重复数据删除。您的 DB 也是该处理器的一部分,因为 DB 是实际存在的那个。

    我在网上找了一下,也许这个链接对你有帮助: processing guarantees

    您发布的链接: exactly once semanticstransactions in kafka很棒。

    关于apache-kafka - 什么时候使用 Kafka 事务 API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58239378/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com