gpt4 book ai didi

java - 如何在 spring-cloud-stream-binder-kafka-streams :3. 1.1 中使用功能方法检索/设置 header

转载 作者:行者123 更新时间:2023-12-05 04:52:15 25 4
gpt4 key购买 nike

我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1 进行函数式编程。如何检索处理器函数中的所有 header

Java代码

@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
// TODO investigate headers on the incoming message
// For example, find partition key on which message was received and publish to same partition key on destination topic
return input -> input;
}
}

最佳答案

为了像那样访问 header ,您需要使用 Kafka Streams 中的低级处理器/转换器 API。您可以混合使用低级处理器 API 和 DSL,同时仍将其用作 Spring Cloud Stream 应用程序。参见 this更多细节。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是一个终端 API,不允许您继续。另一方面,当使用转换器时,您可以在检查 header 后将其作为 KStream 继续。例如,这是一个想法:

input -> input
.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<Object, String, KeyValue<Object, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}

@Override
public KeyValue<Object, String> transform(Object key, String value) {

// Here you can access the headers using this.context.headers()
return new KeyValue<>(key, value);
}

@Override
public void close() {

}
};
}
})
.map(...)
.groupBy(...)
...

查看transform 方法中的注释。在那里,您可以访问每条传入记录的标题。

通过查看您的问题,我发现您正在尝试获取传入记录的分区 ID。为此,您可以直接调用 context.partition()。我认为您不需要为此访问 header 。

这是一个 SO关于访问 header 的线程。

关于java - 如何在 spring-cloud-stream-binder-kafka-streams :3. 1.1 中使用功能方法检索/设置 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66582901/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com