gpt4 book ai didi

scala - 结构化流 - Foreach Sink

转载 作者:行者123 更新时间:2023-12-02 04:06:43 25 4
gpt4 key购买 nike

我基本上是从 Kafka 源读取数据,并将每条消息转储到我的 foreach 处理器(感谢 Jacek 页面提供的简单示例)。

如果这确实有效,我实际上应该在此处的 process 方法中执行一些业务逻辑,但是,这不起作用。我相信 println 无法工作,因为它在执行器上运行,并且无法将这些日志返回到驱动程序。但是,这种插入临时表至少应该可以工作,并向我表明消息实际上已被消耗并处理到接收器。

我在这里缺少什么?

真的在寻找第二双眼睛来检查我的努力:

 val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()

val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

val df = stream.selectExpr("cast (value as string) as json")

val writer = new ForeachWriter[Row] {
val scon = new SConConnection
override def open(partitionId: Long, version: Long) = {
true
}
override def process(value: Row) = {
println("++++++++++++++++++++++++++++++++++++" + value.get(0))
scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
}
override def close(errorOrNull: Throwable) = {
scon.closeConnection
}
}


val yy = df.writeStream
.queryName("ForEachQuery")
.foreach(writer)
.outputMode("append")
.start()

yy.awaitTermination()

最佳答案

感谢 Harald 和其他人的评论,我发现了一些事情,这使我实现了正常的处理行为 -

  1. 使用本地模式测试代码,yarn 并不是调试的最大帮助
  2. 由于某种原因,foreach接收器的process方法不允许调用其他方法。当我将业务逻辑直接放在那里时,它就可以工作。

希望对其他人有帮助。

关于scala - 结构化流 - Foreach Sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44193162/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com