gpt4 book ai didi

scala - pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType

转载 作者:行者123 更新时间:2023-12-02 17:16:10 26 4
gpt4 key购买 nike

在读取不一致的架构写入组 parquet 文件时,我们在架构合并方面遇到问题。切换到手动指定模式时,出现以下错误。任何指针都会有所帮助。

java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

source_location = "{}/{}/{}/dt={}/{}/*_{}_{}.parquet".format(source_initial,
bucket,
source_prefix,
date,
source_file_pattern,
date,
source_file_pattern)
schema = StructType([
StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])

print("Querying data from source location {}".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
fun.sum("P_v").alias("P_v"),
fun.sum("R_v").alias("R_v"),
fun.sum("S_v").alias("S_v"))

df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)

最佳答案

如果模式不兼容,您将无法在一次加载 中读取 parquet 文件。我的建议是将其分为两个负载,然后在它们兼容时合并数据帧。查看示例代码:

schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))

关于scala - pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46242983/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com