gpt4 book ai didi

apache-kafka - Kafka 连接消费者引用偏移量并存储在消息中

转载 作者:行者123 更新时间:2023-12-04 10:02:33 25 4
gpt4 key购买 nike

如果我使用 kafka-connect 来使用消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件负载一起存储?我想要这些数据来对消息进行排序,并检查是否存在任何差距或检查我收到的消息中是否有重复项。 (例如,如果我的消费者抵消被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写自定义订阅者?

最佳答案

根据 Insert Field 上的文档转换,你可以使用 offset.field:

Name            Description
offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

总体而言,您的单一消息转换 (SMT) 配置如下所示:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"

如果这不是您想要的,那么总有一个选项可以创建您的 customised转换

关于apache-kafka - Kafka 连接消费者引用偏移量并存储在消息中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61757391/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com