gpt4 book ai didi

java - 如何在 spark streaming 中映射 kafka 主题名称和各自的记录

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:38:28 26 4
gpt4 key购买 nike

我正在从 kafka 主题流式传输,如下所示;

JavaPairInputDStream<String, String> directKafkaStream = 
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicSet);

directKafkaStream.print();

一个主题的输出如下所示:

(null,"04/15/2015","18:44:14")
(null,"04/15/2015","18:44:15")
(null,"04/15/2015","18:44:16")
(null,"04/15/2015","18:44:17")

如何映射主题名称和记录。
例如:主题是“callData”,它应该像下面这样

(callData,"04/15/2015","18:44:14")
(callData,"04/15/2015","18:44:15")
(callData,"04/15/2015","18:44:16")
(callData,"04/15/2015","18:44:17")

最佳答案

How do I map topic name and records?

为了提取分区信息,you'll need to use the overload which accepts a Function receiving MessageAndMetadata<K, V> 并返回您希望转换成的类型。

看起来像这样:

Map<TopicAndPartition, Long> map = new HashMap<>();
map.put(new TopicAndPartition("topicname", 0), 1L);

JavaInputDStream<Map.Entry> stream = KafkaUtils.createDirectStream(
javaContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
Map.Entry.class, // <--- This is the record return type from the transformation.
kafkaParams,
map,
messageAndMetadata ->
new AbstractMap.SimpleEntry<>(messageAndMetadata.topic(),
messageAndMetadata.message()));

注意我用了Map.Entry作为 Java 对 Tuple2 的替代在斯卡拉。您可以提供自己的类(class),其中有一个 PartitionMessage属性以及用于转换。注意 kafka 输入流的类型现在是 JavaInputDStream<Map.Entry> ,因为这就是转换返回的结果。

关于java - 如何在 spark streaming 中映射 kafka 主题名称和各自的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37543695/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com