gpt4 book ai didi

java - 如何在流式查询(Java)中使用 JSON 数组作为 Kafka 记录?

转载 作者:行者123 更新时间:2023-11-30 07:39:45 27 4
gpt4 key购买 nike

我看过很多从 Kafa 主题中读取 JSON 数据的例子。如果我从每个连接的主题中读取一条记录,我就能够成功地做到这一点,例如:

{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
}

下面的代码适用于上述用例:

package io.examle;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

public static void main(String[] args) throws StreamingQueryException {

StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});

SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();

// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");

dataset.printSchema();

Column col = new Column("json");

Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");
customers.printSchema();

customers.writeStream()
.format("console")
.start()
.awaitTermination();

}

}

但在我看来,上述方法效率低下,即连接到 Kafa 以获得每个连接的单个记录。因此,在我看来,传递以下形式的 JSON 数组会更有效率。因为每个 json 数组可以存储许多“记录”。

[{
"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
"product": "Food widget",
"price": 4,
"bought_date": "2019-01-01"
}
]

问题是我无法解压 JSON 数组并对其进行处理。下面的代码失败了:

package io.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

public static void main(String[] args) throws StreamingQueryException {

StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});

SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();

// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");

dataset.printSchema();

Column col = new Column("json");

Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));


Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
data.printSchema();

data.writeStream()
.format("console")
.start()
.awaitTermination();
}

}




Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;

问题:

1) 如何正确编写能够有效解压 JSON 数组的代码?我怀疑我上面针对失败代码采用的方法是最好的,但我尝试遵循我看到的有关 functions.explode() 等的许多示例。

2) 如果失败的代码奇迹般地是正确的方法。如何将结构转换为数组或映射?

最佳答案

Spark 不会为每个连接提取一条记录。 Kafka API 将一次轮询一批记录。

就 Kafka 的最佳实践而言,多个事件应该拆分为多个对象,而不是塞入一个数组,除非它们确实需要关联,例如你会有一个“购物车”记录和一个“项目”列表订单

要使您的代码正常工作,您的架构必须是 ArrayType (不是结构或映射)。

StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});

ArrayType arrSchema = new ArrayType(schema, false);

然后在使用from_json 的时候使用array schema。

关于java - 如何在流式查询(Java)中使用 JSON 数组作为 Kafka 记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59130355/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com