- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的 spark 结构化流数据帧需要一个 JDBC 接收器。目前,据我所知,DataFrame 的 API 缺乏 writeStream
到 JDBC 实现(既不在 PySpark 也不在 Scala(当前 Spark 版本 2.2.0))。我发现的唯一建议是写我自己的 ForeachWriter
基于 this article 的 Scala 类.
所以,我修改了一个来自 here 的简单字数统计示例。通过添加自定义 ForeachWriter
类并试图 writeStream
到 PostgreSQL。单词流是从控制台手动生成的(使用 NetCat:nc -lk -p 9999)并由 Spark 从套接字读取。
不幸的是,我收到“任务不可序列化”错误。
APACHE_SPARK_VERSION=2.1.0
使用 Scala 版本 2.11.8(Java HotSpot(TM) 64 位服务器 VM,Java 1.8.0_112)
我的斯卡拉代码:
//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.
import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCountToJDBC")
.config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
val driver = "org.postgresql.Driver"
var connection:java.sql.Connection = _
var statement:java.sql.Statement = _
def open(partitionId: Long, version: Long):Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.executeUpdate("INSERT INTO public.test(col1, col2) " +
"VALUES ('" + value(0) + "'," + value(1) + ");")
}
def close(errorOrNull:Throwable):Unit = {
connection.close
}
}
val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>"
val user="<user name>"
val pwd="<pass>"
val writer = new JDBCSink(url, user, pwd)
import org.apache.spark.sql.streaming.ProcessingTime
val query=wordCounts
.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(ProcessingTime("25 seconds"))
.start()
query.awaitTermination()
ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED])
- field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery)
- object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f)
- field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw)
- object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19)
- field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw)
- object (class $line21.$read, $line21.$read@13e62f5d)
- field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read)
- object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c)
- field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw)
- object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5)
- field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw)
- object (class $line25.$read$$iw$$iw$JDBCSink, $line25.$read$$iw$$iw$JDBCSink@6c096c84)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink, org.apache.spark.sql.execution.streaming.ForeachSink@6feccb75)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 25 more
scala> :load <path_to_a_JDBCSink.scala_file>
scala> :paste
没有 JDBCSink 类定义的代码。 最佳答案
只需在单独的文件中定义 JDBCSink,而不是将其定义为可以捕获外部引用的内部类。
关于scala - 如何为 Spark 结构化流编写 JDBC Sink [SparkException : Task not serializable]?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45373795/
这个问题在这里已经有了答案: Why use async and return await, when you can return Task directly? (8 个答案) 关闭 6 年前。
这个问题在这里已经有了答案: Are the days of passing const std::string & as a parameter over? (13 个答案) 关闭 8 年前。 我
我有一组标记为执行的通用任务。当任务完成时(使用 Task.WaitAny ),我将其添加到 ObservableCollection 中. 但是,问题出在 Task.WaitAny(...)行,上面
经过几个小时的努力,我在我的应用程序中发现了一个错误。我认为下面的 2 个函数具有相同的行为,但事实证明它们没有。 谁能告诉我引擎盖下到底发生了什么,以及为什么它们的行为方式不同? public as
这也与 Python 的导入机制有关,特别是与在函数内使用 import 有关。使用 Python 2.7.9 和 Fabric 1.10.0,创建以下三个文件: fabfile.py: from a
我有一个 Web API Controller (ASP.NET Core 5)。我的一些 API 是异步的,而其中一些不是。我接下来的问题是:使用 public **Task** WebApiMet
我们有类似下面的内容 List uncheckItems = new List(); for (int i = 0; i new Task(async () => await Process
我的代码没问题,但我想知道哪种风格更好,你会怎么看,我正在玩异步方法。 让我建立上下文: Parallel.ForEach(xmlAnimalList, async xml => {
这两种使用 await 的形式在功能上有什么区别吗? string x = await Task.Factory.StartNew(() => GetAnimal("feline")); Task m
我刚刚看到 3 个关于 TPL 使用的例程,它们做同样的工作;这是代码: public static void Main() { Thread.CurrentThread.Name = "Ma
考虑以下代码: public void CacheData() { Task.Run((Action)CacheExternalData); Task.Run(() => CacheE
Task> GetTaskDict() { return Task.FromResult(new Dictionary () ); } 此代码无法编译,因为我们无法在 Task> 到 Tas
我正在使用 ASP.NET 5 RC1 _MyPartial @model MyViewModel @using (Html.BeginForm())
当我尝试在 VS Code 中构建 C 任务时,它显示以下消息: 输出仅显示:The task provider for "C/C++" tasks unexpectedly provided a t
一些背景: 基本上归结为我希望能够在当前线程中“执行”任务。为什么? -我有一个任务创建程序例程,有一次我希望任务在后台任务中立即执行,而其他时候我希望使用 IOmniThreadPool 安排任务。
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
我试图将run-sequence添加到我的gulp工作流程中,但是每次尝试执行使用run-sequence的任务时,都会出现此错误: 任务未配置为gulp上的任务。 根据运行序列的来源,这是由以下te
此代码在VS2015中给出了编译时错误 Error CS0266 Cannot implicitly convert type 'System.Threading.Tasks.Task' to 'Sy
我正在尝试通过我的代码通过Google登出: suspend fun signOut(context: Context): Boolean = with(Dispatchers.IO) { t
谁能解释一下这两种说法的区别: Task bTask = backup.BackupCurrentDatabaseAsync() .ContinueWith(_ => CompressArch
我是一名优秀的程序员,十分优秀!