gpt4 book ai didi

apache-flink - 如何迭代 Flink DataStream 中的每条消息?

转载 作者:行者123 更新时间:2023-12-02 04:04:07 25 4
gpt4 key购买 nike

我有一个来自 Kafka 的消息流,如下所示

DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));

如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法DataStream但它不会返回 Iterator<String> .

最佳答案

我认为您正在寻找MapFunction

DataStream<String> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));

DataStream<Y> mappedMessages = messageStream
.map(new MapFunction<String, Y>() {
public Y map(String message) {
// do something with each message and return Y
}
});

如果您不想为每条传入消息发出恰好一条记录,请查看 FlatMapFunction

关于apache-flink - 如何迭代 Flink DataStream 中的每条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40392632/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com