gpt4 book ai didi

scala - Flink : How to convert the deprecated fold to aggregrate?

转载 作者:行者123 更新时间:2023-12-04 12:04:07 25 4
gpt4 key购买 nike

我正在关注 Flink 的快速入门示例:Monitoring the Wikipedia Edit Stream .

这个例子是用 Java 编写的,我在 Scala 中实现它,如下所示:

/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}

result.print

// execute program
env.execute("Wikipedia Edit Monitoring")
}
}

但是, fold Flink 中的函数已经是 已弃用 ,以及 aggregate推荐功能。

enter image description here

但是我没有找到关于如何转换已弃用的 fold 的示例或教程至 aggregrate .

知道怎么做吗?可能不仅通过申请 aggregrate .

更新

我有另一个实现如下:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")

result.print

// execute program
env.execute("Wikipedia Edit Monitoring")
}

/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}

我也想知道如何使用自定义 AggregateFunction 来实现.

更新

我遵循了这个文档: AggregateFunction ,但有以下问题:

接口(interface)源代码中 AggregateFunction 对于 1.3 版,您将看到 add确实返回 void :
void add(IN value, ACC accumulator);

但对于 1.4 版 AggregateFunction , is 正在返回:
ACC add(IN value, ACC accumulator);

我该如何处理?

我使用的 Flink 版本是 1.3.2并且此版本的文档没有 AggregateFunction ,但在 artifactory 中还没有 1.4 版本。

enter image description here

最佳答案

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}

class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
override def createAccumulator() = ("", 0)

override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)

override def getResult(accumulator: (String, Int)) = accumulator

override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}

object WikipediaAnalysis extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())

val result: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.aggregate(new SumAggregate)
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
result.print()

result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
see.execute("Wikipedia User Edit Volume")
}

关于scala - Flink : How to convert the deprecated fold to aggregrate?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47123785/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com