gpt4 book ai didi

java - 从kafka消息中获取主题

转载 作者:搜寻专家 更新时间:2023-10-31 20:33:37 25 4
gpt4 key购买 nike

如何从 kafka 中的消息中识别主题名称。

String[] topics = { "test", "test1", "test2" };
for (String t : topics) {
topicMap.put(t, new Integer(3));
}

SparkConf conf = new SparkConf().setAppName("KafkaReceiver")
.set("spark.streaming.receiver.writeAheadLog.enable", "false")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost");
;
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
1000));

/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
.createStream(jssc, "localhost:2181", "test-group",
topicMap);

JavaDStream<MessageAndMetadata> data =
messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>()
{

public MessageAndMetadata call(Tuple2<String, String> message)
{
System.out.println("message ="+message._2);
return null;
}
}

);

我可以从 kafka 生产者那里获取消息。但由于消费者现在消费三个主题,因此需要识别主题名称。

最佳答案

从 Spark 1.5.0 开始,official documentation鼓励从最近的版本开始使用无接收者/直接方法,该方法已经从最近的 1.5.0 实验中毕业。除了其他好东西之外,这个新的 Direct API 允许您轻松获取消息及其元数据。

关于java - 从kafka消息中获取主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30345748/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com