gpt4 book ai didi

scala - Apache Flink : ProcessWindowFunction implementation

转载 作者:行者123 更新时间:2023-12-05 00:46:49 26 4
gpt4 key购买 nike

我正在尝试使用 Scala 在我的 Apache Flink 项目中使用 ProcessWindowFunction。不幸的是,我已经无法实现基本的 ProcessWindowFunction,就像 Apache Flink 文档中使用的那样。

这是我的代码:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.util.Collector
import scala.collection.TraversableOnce

object StreamingJob {
def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val eventStream = env.addSource(new OrionSource(9001))

val processedDataStream = eventStream.flatMap(event => event.entities)
.map(entity => (entity.id, entity.attrs("temperature").value.asInstanceOf[String]))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new MyProcessWindowFunction())

env.execute("Socket Window NgsiEvent")
}
}


private class MyProcessWindowFunction extends ProcessWindowFunction[(String, String), String, String, TimeWindow] {

def process(key: String, context: Context, input: Iterable[(String, String)], out: Collector[String]): Unit = {
var count: Int = 0
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}

从 IntelliJ 我得到以下提示:

1) 这显示了创建新类对象的位置:
Type mismatch, expected: ProcessWindowFunction[(String, String), NotInferedR, String, TimeWindow], actual: MyProcessWindowFunction

2)这直接在类里面显示:
Class 'MyProcessWindowFunction' must either be declared abstract or implement abstract member 'process(key:KEY, context:ProcessWindowFunction.Context, iterable:Iterable<IN>, collector:Collector<OUT>):void' in 'org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction'

构建代码向我显示以下错误:
Error:(51, 16) type mismatch;
found : org.apache.flink.MyProcessWindowFunction
required:
org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(String, String),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
.process(new MyProcessWindowFunction())

我很感激每一个帮助。

最佳答案

在与另外 2 个人一起调试了一段时间后,我们终于找到了问题所在。

在我的代码中,我使用了以下导入:

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

但是使用 Scala 时正确的导入似乎是:
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

关于scala - Apache Flink : ProcessWindowFunction implementation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53548308/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com