gpt4 book ai didi

java - 如何在spark中将List的byte[]解码为Dataset

转载 作者:行者123 更新时间:2023-12-02 09:01:38 25 4
gpt4 key购买 nike

我在我的项目中使用spark-sql-2.3.1v、kafka和java8。我正在尝试将接收到的主题 byte[] 转换为 kafka 消费者端的数据集。

详细信息如下

我有

class Company{
String companyName;
Integer companyId;
}

我定义为

public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);

但是消息定义为

class Message{
private List<Company> companyList;
private String messageId;
}

我试图定义为

StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);

我使用序列化将消息作为 byte[] 发送到 kafka 主题。

我在消费者处成功收到消息字节[]。我正在尝试将其转换为数据集?怎么办?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

messagesDs.printSchema();

root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

出现错误:

线程“main”org.apache.spark.sql.AnalysisException中出现异常:无法解析给定输入列的“companyName”:[col];

如何获取Dataset记录,如何获取?

最佳答案

你的结构在爆炸时被命名为“col”。

由于您的 Bean 类没有“col”属性,因此它会因上述错误而失败。

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];

您可以执行以下选择来获取相关列作为普通列:像这样的事情:

    Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

我尚未测试语法,但一旦您从每行的结构中获得纯列,就必须立即执行下一步。

关于java - 如何在spark中将List<Objects>的byte[]解码为Dataset<Row>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60116642/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com