gpt4 book ai didi

apache-spark - 在 spark 中从 kafka 消息中获取主题

转载 作者:行者123 更新时间:2023-12-04 05:18:37 26 4
gpt4 key购买 nike

在我们的 spark-streaming 工作中,我们从 kafka 中读取流中的消息。

为此,我们使用 KafkaUtils.createDirectStream返回 JavaPairInputDStreamfrom 的 API .

通过以下方式从 kafka(来自三个主题 - test1、test2、test3)读取消息:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每个消息的主题名称。

所以我们做以下事情:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

这是 SplitToLinesFunction的实现:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}

问题在于 tuple2._1为空,我们假设 tuple2._1将包含一些元数据,例如消息来自的主题/分区的名称。

但是,当我们打印 tuple2._1 时,为空。

我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 Spark 流代码中, tuple2._1将包含它(并且不为空)?

请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称。 :

但它返回所有发送到 KafkaUtils.createDirectStream 的主题。 ,而不是消息(属于当前 RDD)来自的特定主题。

所以它没有帮助我们识别从 RDD 中发送消息的主题的名称。

编辑

回应大卫的回答 - 我尝试使用 MessageAndMetadata像这样:
        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{

@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}

}

JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {

@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});

问题是在 MessageAndMetadataFunction.call 中没有打印任何内容。方法。我应该修复什么才能在 MessageAndMetadataFunction.call 中获得该 RDD 的相关主题方法?

最佳答案

使用 createDirectStream 的版本之一这需要一个 messageHandler函数作为参数。这是我所做的:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
ssc,
kafkaParams,
getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)

有些东西对你来说没有任何意义——相关的部分是
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}

如果您不熟悉 Scala ,该函数所做的只是返回一个 Tuple2包含 msg.topicmsg.message .您的函数需要返回这两个,以便您在下游使用它们。您可以返回整个 MessageAndMetadata object 代替,它为您提供了一些其他有趣的字段。但如果您只想要 topicmessage ,然后使用上面的。

关于apache-spark - 在 spark 中从 kafka 消息中获取主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36770641/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com