gpt4 book ai didi

apache-kafka - Kafka Streams 如何获取 kafka header

转载 作者:行者123 更新时间:2023-12-04 01:29:48 35 4
gpt4 key购买 nike

我有以下 kafka 流代码

    public class KafkaStreamHandler implements  Processor<String, String>{

private ProcessorContext context;


@Override
public void init(ProcessorContext context) {
// TODO Auto-generated method stub
this.context = context;
}

public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {

Headers contexts = context.headers();

contexts.forEach(header -> System.out.println(header));
}

public void StartFailstreamHandler() {
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "500");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//consumer_timeout_ms
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

props.put("state.dir","/tmp/kafka/stat));

userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

/* take few descsion based on Header */
/* How to get the Header */

userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {

logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});


kafkaStreams.cleanUp();
kafkaStreams.start();
}

}

现在我们的客户端之一正在发送有关 kafka header 的版本信息,如下所示。
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);

基于此 header ,我需要为我的消息选择解析器,如何使用 KStream 运算符读取此 header ?
我已经看到了流的所有 API,但没有方法给出 header

我无法更改为普通 kakfa 使用者,因为我的应用程序已经依赖于少数 KStream API ..

最佳答案

Processor 不允许您在下游 DSL 中链接 new 运算符,您应该使用 transformValues 以便使用可以继续使用 Stream DSL:

  • 首先从 ValueTransformerWithKey 中提取 Headers
  • public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {

    ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }

    @Override
    public String transform(String readOnlyKey, String value) {
    Headers headers = context.headers();
    /* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
    /* How to get the Header */
    return value;
    }

    @Override
    public void close() {

    }
    }
  • 将 ExtractHeaderThenDoSomethingTransformer 添加到您的拓扑中,如下所示:
  • userStream
    .transformValues(ExtractHeaderThenDoSomethingTransformer::new)
    .map(this::processs);

    关于apache-kafka - Kafka Streams 如何获取 kafka header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61270063/

    35 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com