gpt4 book ai didi

java - 是否可以使用 Kafka Streams 访问消息头?

转载 作者:行者123 更新时间:2023-12-01 17:55:38 26 4
gpt4 key购买 nike

添加Headers对于Kafka 0.11中的记录( ProducerRecord & ConsumerRecord ),在使用Kafka Streams处理主题时是否可以获取这些 header ?当在 KStream 上调用 map 等方法时,它提供记录的 keyvalue 参数,但不提供我可以看到访问标题的方式。如果我们能够在 ConsumerRecord 上进行映射,那就太好了。

例如。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...

最佳答案

自版本 2.0.0 起即可访问记录 header (有关详细信息,请参阅 KIP-244)。

您可以通过处理器 API 访问记录元数据(即通过 transform()transformValues()process()) ,通过给定的“上下文”对象(参见 https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context )。

更新

从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478 ),添加了一个新的类型安全的 api.Processor 类以及 process(Record) 而不是 process(K, V) 方法。对于这种情况,可以通过 Record 类访问 header (和记录元数据)。

此新功能在“DSL 的 PAPI 方法”中尚不可用(例如 KStream#process()KStream#transform() 和 sibling )。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上由 Streams 删除的 header 。

元数据在 DSL 级别不可用。然而,通过 KIP-159 扩展 DSL 的工作也在进行中。 .

关于java - 是否可以使用 Kafka Streams 访问消息头?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60722488/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com