gpt4 book ai didi

java - 是否可以使用 Kafka Streams 访问消息 header ?

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:36:41 26 4
gpt4 key购买 nike

加上 Headers对于 Kafka 0.11 中的记录( ProducerRecord & ConsumerRecord ),在使用 Kafka Streams 处理主题时是否可以获取这些 header ?当在 KStream 上调用类似 map 的方法时,它提供记录的 keyvalue 的参数,但没有我可以看到访问 headers 的方式。如果我们可以在 ConsumerRecordmap 就好了。

例如

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...

最佳答案

自版本 2.0.0 起可以访问记录 header (有关详细信息,请参见 KIP-244)。

您可以通过处理器 API(即通过 transform()transformValues()process())访问记录元数据,由给定的“上下文”对象(参见 https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context )。

更新

从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478 ),添加了一个新的类型安全的 api.Processor 类和 process(Record) 而不是 process(K, V) 方法。对于这种情况,可以通过 Record 类访问 header (和记录元数据)。

此新功能在“DSL 的 PAPI 方法中尚不可用(例如。KStream#process()KStream#transform() 和 sibling )。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在旧版本中实际上由 Streams 读取时丢弃的 header 。

不过,元数据在 DSL 级别不可用。然而,通过 KIP-159 扩展 DSL 的工作也在进行中。 .

关于java - 是否可以使用 Kafka Streams 访问消息 header ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46736484/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com