gpt4 book ai didi

java - Kafka Streams 生产者上的自定义 header

转载 作者:行者123 更新时间:2023-12-02 09:38:27 34 4
gpt4 key购买 nike

我在一个主题中有多个事件,我正在尝试按以下步骤进行处理:

  1. 根据 header 值过滤事件
  2. 应用解串器
  3. 按键分组
  4. 聚合以生成新的 KTable
  5. 新 KTable 将与具有新 header 的新事件流式传输到同一主题。

我可以使用transformValues访问 header ,但不确定如何在执行toStream时注入(inject)新的 header 值。

streamsBuilder.stream("my-topic")
.transformValues(new Transformer())//access headers here n filter few events
.groupByKey(Serialized.with(Serdes.String(),null)
.aggregate(()->my avro object initialization,(key,value,aggregate)->newValue(Value,aggregate),Materialized.as("my-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.ByteArray())
.mapValues((key,value)->convert to bytes).toStream()

注意:我是 KStream 的新手。

最佳答案

您可以使用Processor API添加自定义 header 。以与访问 header 相同的方式实现处理方法。

new Processor() { 
......
@override
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "key");
}
...
}

关于java - Kafka Streams 生产者上的自定义 header ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57309765/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com