gpt4 book ai didi

scala - 在 Spark Streaming 应用程序中注册 UDF

转载 作者:行者123 更新时间:2023-12-01 05:03:55 24 4
gpt4 key购买 nike

我有一个 Spark 流应用程序,它使用用 Scala 编写的 SparkSQL,它尝试在获得 RDD 后注册一个 udf。我收到以下错误。无法在 SparkStreaming 应用程序中注册 udfs 吗?

这是引发错误的代码片段:

sessionStream.foreachRDD((rdd: RDD[(String)], time: Time) => {
val sqlcc = SqlContextSingleton.getInstance(rdd.sparkContext)
sqlcc.udf.register("getUUID", () => java.util.UUID.randomUUID().toString)
...
}

这是我尝试注册函数时抛出的错误:
Exception in thread "pool-6-thread-6" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:173)
at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:164)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

最佳答案

sessionStream.foreachRDD((rdd: RDD[Event], time: Time) => {
val f = (t: Long) => t - t % 60000

val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.toDF()

val per_min = udf(f)
val grouped = df.groupBy(per_min(df("created_at")) as "created_at",
df("blah"),
df("status")
).agg(sum("price") as "price",sum("payout") as "payout", sum("counter") as "counter")
...
}

我工作正常

关于scala - 在 Spark Streaming 应用程序中注册 UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30876021/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com