gpt4 book ai didi

apache-kafka-streams - 有没有办法为 kafka 流中消耗的每条消息获取偏移量?

转载 作者:行者123 更新时间:2023-12-04 08:20:19 26 4
gpt4 key购买 nike

为了避免在 KAFKA STREAMS 被终止时读取已处理但未提交的消息,我想获取每条消息的偏移量以及键和值,以便我可以将其存储在某处并使用它来避免重新处理已处理的消息。

最佳答案

是的,这是可能的。请参阅 http://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information 中的常见问题解答条目.

我将复制粘贴以下关键信息:

Accessing record metadata such as topic, partition, and offset information?

Record metadata is accessible through the Processor API. It is also accessible indirectly through the DSL thanks to its Processor API integration.

With the Processor API, you can access record metadata through a ProcessorContext. You can store a reference to the context in an instance field of your processor during Processor#init(), and then query the processor context within Processor#process(), for example (same for Transformer). The context is updated automatically to match the record that is currently being processed, which means that methods such as ProcessorContext#partition() always return the current record’s metadata. Some caveats apply when calling the processor context within punctuate(), see the Javadocs for details.

If you use the DSL combined with a custom Transformer, for example, you could transform an input record’s value to also include partition and offset metadata, and subsequent DSL operations such as map or filter could then leverage this information.

关于apache-kafka-streams - 有没有办法为 kafka 流中消耗的每条消息获取偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44945136/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com