gpt4 book ai didi

scala - 在 Clojure 中编写 Spark Structured Streaming 示例时出错

转载 作者:行者123 更新时间:2023-12-02 03:00:03 25 4
gpt4 key购买 nike

我正在尝试在 Clojure 中重写 Spark Structured Streaming 示例。

该示例是用 Scala 编写的,如下所示:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example
(:import [org.apache.spark.sql Encoders SparkSession Dataset Row]
[org.apache.spark.sql.functions]
))

(def spark
(->
(SparkSession/builder)
(.appName "sample")
(.master "local[*]")
.getOrCreate)
)


(def lines
(-> spark
.readStream
(.format "socket")
(.option "host" "localhost")
(.option "port" 9999)
.load
)
)

(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap #(clojure.string/split % #" " ))
))

上面的代码导致以下异常。

;;由 java.lang.IllegalArgumentException 引起
;;找不到匹配的方法:类的 flatMap
;; org.apache.spark.sql.Dataset

我怎样才能避免错误?

最佳答案

你必须遵守签名。 Java Dataset API 提供了 Dataset.flatMap 的两种实现, 一个需要 scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

第二个是 Spark 自己的 o.a.s.api.java.function.FlatMapFunction
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

前一个对你来说没什么用,但是你应该可以使用后一个。对于 RDD API flambo uses macros to create Spark friendly adapters可以通过 flambo.api/fn 访问- 我不确定这些是否可以直接用于 Datasets ,但如果需要,您应该能够调整它们。

由于您不能依赖隐式 Encoders您还必须提供与返回类型匹配的显式编码器。

总的来说,你需要一些东西:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap f e)
))

哪里 f工具 FlatMapFunctioneEncoder .一个示例实现:
(def words
(-> lines
(.as (Encoders/STRING))
(.flatMap
(proxy [FlatMapFunction] []
(call [s] (.iterator (clojure.string/split s #" "))))
(Encoders/STRING))))

但我想有可能找到一个更好的。

在实践中,我会避​​免输入 Dataset任何并专注于 DataFrame ( Dataset[Row] )。

关于scala - 在 Clojure 中编写 Spark Structured Streaming 示例时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46657192/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com