gpt4 book ai didi

apache-spark - org.apache.spark.sql.AnalysisException : 'write' can not be called on streaming Dataset/DataFrame

转载 作者:行者123 更新时间:2023-12-04 05:11:57 25 4
gpt4 key购买 nike

我正在尝试将 Spark Structured Streaming (2.3) 数据集写入 ScyllaDB (Cassandra)。

我编写数据集的代码:

  def saveStreamSinkProvider(ds: Dataset[InvoiceItemKafka]) = {
ds
.writeStream
.format("cassandra.ScyllaSinkProvider")
.outputMode(OutputMode.Append)
.queryName("KafkaToCassandraStreamSinkProvider")
.options(
Map(
"keyspace" -> namespace,
"table" -> StreamProviderTableSink,
"checkpointLocation" -> "/tmp/checkpoints"
)
)
.start()
}

我的 ScyllaDB 流接收器:

class ScyllaSinkProvider extends StreamSinkProvider {
override def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): ScyllaSink =
new ScyllaSink(parameters)
}

class ScyllaSink(parameters: Map[String, String]) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit =
data.write
.cassandraFormat(
parameters("table"),
parameters("keyspace")
//parameters("cluster")
)
.mode(SaveMode.Append)
.save()
}

但是,当我运行此代码时,我收到一个异常:

...
[error] +- StreamingExecutionRelation KafkaSource[Subscribe[transactions_load]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
[error] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
[error] at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[error] Caused by: org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame;
[error] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[error] at org.apache.spark.sql.Dataset.write(Dataset.scala:3103)
[error] at cassandra.ScyllaSink.addBatch(CassandraDriver.scala:113)
[error] at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
...

我见过一个类似的问题,但那是针对 CosmosDB 的 - Spark CosmosDB Sink: org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame

最佳答案

您可以先将其转换为 RDD,然后编写:

class ScyllaSink(parameters: Map[String, String]) extends Sink {    

override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val schema = data.schema
// this ensures that the same query plan will be used
val rdd: RDD[Row] = df.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}

// write the RDD to Cassandra
}
}

关于apache-spark - org.apache.spark.sql.AnalysisException : 'write' can not be called on streaming Dataset/DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53086376/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com