gpt4 book ai didi

java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?

转载 作者:行者123 更新时间:2023-11-30 05:32:22 26 4
gpt4 key购买 nike

目前,我有一个 Flink Cluster,想要通过一种 Pattern 来消费 Kafka Topic,通过这种方式,我们不需要维护一个硬编码的 Kafka Topic 列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

我只是想知道,通过上述方式,在处理过程中如何才能知道真实的Kafka主题名称?谢谢。

--更新--我之所以需要知道主题信息,是因为我们需要这个主题名称作为参数,在接下来的 Flink Sink 部分中使用。

最佳答案

您可以实现自己的自定义 KafkaDeserializationSchema,如下所示:

  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}

@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
}

@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}

使用自定义的KafkaDeserializationSchema,您可以创建元素包含主题信息的DataStream。在我的演示案例中,元素类型是 Tuple2<String, String> ,这样就可以通过 Tuple2#f0 访问主题名称.

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
String topicName = value.f0;
// your processing logic here.
out.collect(value.f1);
}
});

关于java - 如何在Flink Kafka Consumer中动态获取处理kafka主题名称?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57266072/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com