gpt4 book ai didi

java - Kafka : How to consume, 操作、确认(手动)并将结果发送到新主题

转载 作者:行者123 更新时间:2023-12-01 17:46:39 26 4
gpt4 key购买 nike

我目前正在使用 kafka-streams 库。

我想要做什么:(使用kafka-streams)我试图从主题消费,操纵消息值,确认该消息并将结果传输到另一个主题(代码如下)

Properties properties = new Properties();
.
.
properties.put("enable.auto.commit", false);

StreamBuilder builder = new StreamBuilder();
KStream kStream = builder.stream("MyTopic");
KafkaStream kafkaStream = new KafkaStream(builder.build(), properties)

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

kStream.forEach(new ForeachAction<String, String>(){

@Override
public void apply(Strings arg, String value){
//Just doing some simple data manipulation
String myValue = value + new Date().toString();

//Sending result to new topic
producer.send(new ProducerRecord<String,String>("MyTopicWithTimeStamp", myValue)):
// Problem (1) Here -> How do I acknowledge this from here manually

// Problem (2) How should I properly handle/close my producer (if at all)
}
});

kafkaStram.start();

我不知道该怎么做:使用 kafka-streams 库正确确认消息

最佳答案

您可以使用 KStream#to 直接写入 Kafka 输出主题,而不是创建和维护自己的生产者实例,如下所示:

final Properties props = new Properties();
...
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("MyTopic");
source.mapValues(record -> record + new Date().toString()).to("MyTopicWithTimeStamp",
Produced.with(Serdes.String(), Serdes.String()));
...
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

关于java - Kafka : How to consume, 操作、确认(手动)并将结果发送到新主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54430819/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com