gpt4 book ai didi

scala - 如何使用已更改的架构从 Spark 写入 Kafka 而不会出现异常?

转载 作者:行者123 更新时间:2023-12-01 00:18:08 24 4
gpt4 key购买 nike

我正在将 Parquet 文件从 Databricks 加载到 Spark:

val dataset = context.session.read().parquet(parquetPath)

然后我执行一些这样的转换:
val df = dataset.withColumn(
columnName, concat_ws("",
col(data.columnName), lit(textToAppend)))

当我尝试将它作为 JSON 保存到 Kafka 时(不是回到 Parquet !):
df = df.select(
lit("databricks").alias("source"),
struct("*").alias("data"))

val server = "kafka.dev.server" // some url
df = dataset.selectExpr("to_json(struct(*)) AS value")
df.write()
.format("kafka")
.option("kafka.bootstrap.servers", server)
.option("topic", topic)
.save()

我收到以下异常:
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file dbfs:/mnt/warehouse/part-00001-tid-4198727867000085490-1e0230e7-7ebc-4e79-9985-0a131bdabee2-4-c000.snappy.parquet. Column: [item_group_id], Expected: StringType, Found: INT32
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:310)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
at com.databricks.sql.io.parquet.NativeColumnReader.readBatch(NativeColumnReader.java:448)
at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.nextBatch(DatabricksVectorizedParquetRecordReader.java:330)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:167)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:299)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

仅当我尝试读取多个分区时才会发生这种情况。例如在 /mnt/warehouse/目录我有很多 Parquet 文件,每个文件都代表来自 datestamp 的数据.如果我只阅读其中之一,则不会出现异常,但如果我阅读整个目录,则会发生此异常。

我在进行转换时会得到这个,就像上面我更改列的数据类型一样。我怎样才能解决这个问题?我不是要写回 parquet,而是将所有文件从相同的源模式转换为新模式并将它们写入 Kafka。

最佳答案

Parquet 文件似乎有问题。 item_group_id文件中的列并非都是相同的数据类型,有些文件将列存储为字符串,而其他文件则为整数。来自异常的源代码SchemaColumnConvertNotSupportedException我们看到描述:

Exception thrown when the parquet reader find column type mismatches.



可以在 github 上的 Spark 测试中找到一个简单的方法来复制该问题。 :
Seq(("bcd", 2)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet(s"$path/parquet")
Seq((1, "abc")).toDF("a", "b").coalesce(1).write.mode("append").parquet(s"$path/parquet")

spark.read.parquet(s"$path/parquet").collect()

当然,这只会在一次读取多个文件时发生,或者在上面的测试中附加了更多数据。如果读取单个文件,则不会出现列的数据类型之间的不匹配问题。

最简单的方法 解决问题的方法是确保在写入文件时所有文件的列类型都是正确的。

替代 是单独读取所有 Parquet 文件,更改模式以匹配,然后将它们与 union 结合起来.一种简单的方法是调整模式:
// Specify the files and read as separate dataframes
val files = Seq(...)
val dfs = files.map(file => spark.read.parquet(file))

// Specify the schema (here the schema of the first file is used)
val schema = dfs.head.schema

// Create new columns with the correct names and types
val newCols = schema.map(c => col(c.name).cast(c.dataType))

// Select the new columns and merge the dataframes
val df = dfs.map(_.select(newCols: _*)).reduce(_ union _)

关于scala - 如何使用已更改的架构从 Spark 写入 Kafka 而不会出现异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50852866/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com