gpt4 book ai didi

java - 如何仅从kafka源中获取值到spark?

转载 作者:行者123 更新时间:2023-12-02 02:46:49 25 4
gpt4 key购买 nike

我从kafka源获取日志,并将其放入spark中。
保存在我的 hadoop_path 中的日志格式如下所示
{"value":"{\"姓名\":\"艾米\",\"年龄\":\"22\"}"}
{"value":"{\"姓名\":\"金\",\"年龄\":\"26\"}"}

但是,我想让它像
{\"姓名\":\"艾米\",\"年龄\":\"22\"}
{\"姓名\":\"金\",\"年龄\":\"26\"}

任何类型的解决方案都会很棒。 (使用纯Java代码、Spark SQL或Kafka)

        SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();

最佳答案

使用以下内容:

Dataframe<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
df.printSchema();
StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")
.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();

确保架构包含 value 作为列。

关于java - 如何仅从kafka源中获取值到spark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57138507/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com