gpt4 book ai didi

java - 如何使用 JavaSparkContext 处理来自 Kafka 的记录中带有文件名的文件?

转载 作者:行者123 更新时间:2023-11-29 04:30:15 25 4
gpt4 key购买 nike

在我的应用程序中,有一个 Web UI 应用程序在完成向 Kafka 的文件上传过程后发送文件路径。

我有一个 Spark Streaming 应用程序,它使用 JavaSparkContextJavaPairInputDStream 从 Kafka 中提取消息(因此它接收文件路径,但也可能有多个文件路径)。

我必须并行处理这些文件,并且需要将结果发送到另一个 kafka 流:

SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("topic1");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

directKafkaStream.foreachRDD(rdd -> {

rdd.collect().forEach((t) -> {
sendMessage(sc, t._2());
});
});

ssc.start();
ssc.awaitTermination();

sendMessage 将发送文件中的数据。

在上面的实现中,我在 foreachRDD 方法中使用了 JavaSparkContext,这不是最佳实践。我想并行处理文件。

最佳答案

我将创建一个函数 sendMessage,它将是一个纯 Kafka 生产者(不依赖于 Spark,尤其是 JavaSparkContext),它将向 Kafka 发送消息主题或获取要发送的所有消息的迭代器。

参见 official documentation of Apache Kafka .

使用纯 Kafka 生产者作为 sendMessage 我将在 Spark Streaming 的转换中执行以下操作(内联注释应该为您提供一些关于每一行发生的事情的提示):

def sendMessage(message: String) = {
println(s"Sending $message to Kafka")
}
dstream.map(_.value).foreachRDD { rdd =>
println(s"Received rdd: $rdd with ${rdd.count()} records")
// take paths from RDD that contains Kafka records with the file names
val files = rdd.collect()
files.foreach { f =>
// read a file `f` using Spark Core's RDD API
rdd.sparkContext.textFile(f).map { line =>
// do something with line
// this is the place for a pure Spark transformation
// it's as if you were outside Spark Streaming
println(line)
line
}.foreachPartition { linesAfterProcessingPerPartition =>
// send lines to Kafka
// they have been processed using Spark
linesAfterProcessingPerPartition.foreach { line =>
sendMessage(message = line)
}
}
}
}

我确信代码可以变得更清晰,但那是 Scala,而你使用 Java,所以我就到此为止。


我强烈建议使用 Spark SQL's Structured Streaming因为它将很快取代 Spark Streaming 并成为 Spark 中的流式 API。

关于java - 如何使用 JavaSparkContext 处理来自 Kafka 的记录中带有文件名的文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44044317/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com