gpt4 book ai didi

apache-spark - Spark java数据帧字符串无法转换为结构

转载 作者:行者123 更新时间:2023-12-05 06:00:05 26 4
gpt4 key购买 nike

我定义了以下 spark 模式

        StructType state = DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("version", DataTypes.IntegerType, false),
DataTypes.createStructField("value", DataTypes.StringType, false)
});

ArrayType relationship = DataTypes.createArrayType(DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("cid", DataTypes.StringType, false),
DataTypes.createStructField("state", state, false),
}));

StructType cr = DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("cmg", relationship, false)
});

StructType schema = DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("cr", cr, false)
});

如果我将数据框创建为

Row r1 = RowFactory.create("{cr:{cmg:[{cid:\"B06XW5BXJZ\",state:{version:19,value:"approved"}}]}}");
List<Row> rowList = ImmutableList.of(r1);
Dataset<Row> df = spark.sqlContext().createDataFrame(rowList, schema);

代码给出以下错误

The value ({cr:{cmg:[{cid:"B06XW5BXJZ",state:{version:19,value:"approved"}}]}}) of the type (java.lang.String) cannot be converted to struct<cmg:array<struct<cid:string,state:struct<version:int,value:string>>>>

我错过了什么?

最佳答案

当你执行 createDataFrame(rowList, schema) Spark 尝试解释 rowList 中每个元素的内容使用提供的架构。但是,rowList 中的值是字符串,而不是结构化对象,因此 Spark 无法应用模式。

您有多种选择可以将该对象以结构化形式加载到数据框中。

加载数据为json字符串并使用spark解析

String jsonRow = "{cr:{cmg:[{cid:\"B06XW5BXJZ\",state:{version:19,value:\"approved\"}}]}}";
Dataset<Row> df = spark.createDataset(List.of(jsonRow), Encoders.STRING())
.select(functions.from_json(functions.col("value"), schema, Map.of("allowUnquotedFieldNames", "true")));

在这种情况下,它首先创建一个 Dataset<String>其中每一行包含一个字符串列 ( value ),然后使用 from_json spark sql 函数使用您的模式解析 json。

另请注意 allowUnquotedFieldNames=true 的使用选项,必需的,因为在输入字符串中未引用字段名称。

手动创建结构化行并将它们加载到 Dataframe 中

Row structuredRow = RowFactory.create(RowFactory.create(List.of(RowFactory.create("B06XW5BXJZ", RowFactory.create(19, "approved")))));
Dataset<Row> df = spark.createDataFrame(List.of(structuredRow), schema);

这扩展了您使用 RowFactory 的初始尝试手动创建行。行必须反射(reflect)模式中定义的结构(或者更确切地说,模式必须尊重行的结构)。

使用自定义 Java bean 类

类定义
public static class State implements Serializable {
private Integer version;
private String value;
// getters, setters, constructors
}

public static class Relationship implements Serializable {
private String cid;
private State state;
// getters, setters, constructors
}

public static class Cr implements Serializable {
private List<Relationship> cmg;
// getters, setters, constructors
}

public static class RowBean implements Serializable {
private Cr cr;
// getters, setters, constructors
}
使用bean类创建Dataset
RowBean row = new RowBean(new Cr(List.of(new Relationship("B06XW5BXJZ", new State(19, "approved")))));
Dataset<RowBean> ds = spark.createDataset(List.of(row), Encoders.bean(RowBean.class));

在这种情况下,使用自定义 Java bean/Scala 案例类,使用 Encoders.bean() 直接从类结构中提取模式。

关于apache-spark - Spark java数据帧字符串无法转换为结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67868386/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com