gpt4 book ai didi

java - 如何从 Kafka 访问记录中的文件路径并从中创建数据集?

转载 作者:搜寻专家 更新时间:2023-11-01 03:32:25 24 4
gpt4 key购买 nike

我正在使用 Java。

我正在通过 Kafka 消息接收文件路径。我需要将此文件加载到 spark RDD 中,对其进行处理,然后将其转储到 HDFS 中。

我能够从 Kafka 消息中检索文件路径。我希望在此文件上创建一个数据集/RDD。

我无法在 Kafka 消息数据集上运行 map 函数。由于 sparkContext 在 worker 上不可用,它会出现 NPE 错误。

我无法在 Kafka 消息数据集上运行 foreach。它出错并显示消息:

Queries with streaming sources must be executed with writeStream.start();" 

我无法收集从kafka消息数据集接收到的数据,因为它会出错

Queries with streaming sources must be executed with writeStream.start();;

我想这一定是一个非常普遍的用例,并且必须在很多设置中运行。

如何从我在 Kafka 消息中收到的路径将文件加载为 RDD?

SparkSession spark = SparkSession.builder()
.appName("MyKafkaStreamReader")
.master("local[4]")
.config("spark.executor.memory", "2g")
.getOrCreate();

// Create DataSet representing the stream of input lines from kafka
Dataset<String> kafkaValues = spark.readStream()
.format("kafka")
.option("spark.streaming.receiver.writeAheadLog.enable", true)
.option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER)
.option("subscribe", Configuration.KAFKA_TOPIC)
.option("fetchOffset.retryIntervalMs", 100)
.option("checkpointLocation", "file:///tmp/checkpoint")
.load()
.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());

Dataset<String> messages = kafkaValues.map(x -> {
ObjectMapper mapper = new ObjectMapper();
String m = mapper.readValue(x.getBytes(), String.class);
return m;
}, Encoders.STRING() );

// ====================
// TEST 1 : FAILS
// ====================
// CODE TRYING TO execute MAP on the received RDD
// This fails with a Null pointer exception because "spark" is not available on worker node

/*
Dataset<String> statusRDD = messages.map(message -> {

// BELOW STATEMENT FAILS
Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message);
Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
return getHdfsLocation();

}, Encoders.STRING());

StreamingQuery query2 = statusRDD.writeStream().outputMode("append").format("console").start();
*/

// ====================
// TEST 2 : FAILS
// ====================
// CODE BELOW FAILS WITH EXCEPTION
// "Queries with streaming sources must be executed with writeStream.start();;"
// Hence, processing the deduplication on the worker side using
/*
JavaRDD<String> messageRDD = messages.toJavaRDD();

messageRDD.foreach( message -> {

Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message);
Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());

});
*/

// ====================
// TEST 3 : FAILS
// ====================
// CODE TRYING TO COLLECT ALSO FAILS WITH EXCEPTION
// "Queries with streaming sources must be executed with writeStream.start();;"
// List<String> mess = messages.collectAsList();

关于如何读取创建文件路径并在文件上创建 RDD 的任何想法?

最佳答案

在结构化流中,我不认为有一种方法可以将一个流中的数据具体化为数据集操作的参数。

在 Spark 生态系统中,这可以通过组合 Spark Streaming 和 Spark SQL(数据集)来实现。我们可以使用 Spark Streaming 来消费 Kafka 主题,然后使用 Spark SQL,我们可以加载相应的数据并应用所需的流程。

这样的工作看起来大概是这样的:(这是在 Scala 中,Java 代码将遵循相同的结构。只是实际代码有点冗长)

// configure and create spark Session

val spark = SparkSession
.builder
.config(...)
.getOrCreate()

// create streaming context with a 30-second interval - adjust as required
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(30))

// this uses Kafka080 client. Kafka010 has some subscription differences

val kafkaParams = Map[String, String](
"metadata.broker.list" -> kafkaBootstrapServer,
"group.id" -> "job-group-id",
"auto.offset.reset" -> "largest",
"enable.auto.commit" -> (false: java.lang.Boolean).toString
)

// create a kafka direct stream
val topics = Set("topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext, kafkaParams, topics)

// extract the values from the kafka message
val dataStream = stream.map{case (id, data) => data}

// process the data
dataStream.foreachRDD { dataRDD =>
// get all data received in the current interval
// We are assuming that this data fits in memory.
// We're not processing a million files per second, are we?
val files = dataRDD.collect()
files.foreach{ file =>
// this is the process proposed in the question --
// notice how we have access to the spark session in the context of the foreachRDD
val fileDataset = spark.read().option("header", "true").csv(file)
val dedupedFileDataset = fileDataset.dropDuplicates()
// this can probably be written in terms of the dataset api
//dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
dedupedFileDataset.write.format("text").mode("overwrite").save(getHdfsLocation())
}
}

// start the streaming process
streamingContext.start()
streamingContext.awaitTermination()

关于java - 如何从 Kafka 访问记录中的文件路径并从中创建数据集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46423009/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com