- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在关注 Flink 的快速入门示例:Monitoring the Wikipedia Edit Stream .
这个例子是用 Java 编写的,我在 Scala 中实现它,如下所示:
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy( _.getUser )
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
fold
Flink 中的函数已经是
已弃用 ,以及
aggregate
推荐功能。
fold
的示例或教程至
aggregrate
.
aggregrate
.
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map( e => UserWithEdits(e.getUser, e.getByteDiff) )
.keyBy( "user" )
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
AggregateFunction
来实现.
AggregateFunction
对于 1.3 版,您将看到
add
确实返回
void
:
void add(IN value, ACC accumulator);
AggregateFunction
, is 正在返回:
ACC add(IN value, ACC accumulator);
1.3.2
并且此版本的文档没有
AggregateFunction
,但在 artifactory 中还没有 1.4 版本。
最佳答案
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
override def createAccumulator() = ("", 0)
override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)
override def getResult(accumulator: (String, Int)) = accumulator
override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}
object WikipediaAnalysis extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())
val result: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.aggregate(new SumAggregate)
// .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
result.print()
result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
see.execute("Wikipedia User Edit Volume")
}
关于scala - Flink : How to convert the deprecated fold to aggregrate?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47123785/
(按标题道歉,我不能做得更好) 我的问题是找到一些通用的结构或“标准”函数来执行下一件事: xmap :: (a -> b) -> f a -> g b 然后,我们不仅可以映射元素,还可以映射整个结构
我正在尝试构建一个分为首屏和非首屏部分的页面,让查看者有一种进入海底的印象。我遇到了一个绊脚石,因为当涉及到 CSS 定位的概念时,我的脑子里有些东西就是拒绝点击。 最终,我试图在折叠下方添加另一个部
我正在尝试使用 Rayon启动一系列顶级线程以递归调用模拟函数。该代码在使用 channel 发送和接收时有效,因此它是多线程兼容的,但它无法使用 par_iter() 进行编译。 fn simula
所以我的任务是在 Scheme 中使用 fold-left 或 fold-right 实现最基本版本的 'map' 函数和 'filter' 函数。我很难理解这些函数到底在做什么。这是我所拥有的: (
我的序列、数组等的顺序很重要。我曾尝试在 List、Seq 和 Array 之间进行转换,以查看是否存在差异,并且在每种情况下都将顺序颠倒。 例如,我有一个 [名词] [动词] [形容词] 的序列,它
我发现自己一遍又一遍地重复一个模式,我想把它抽象出来。我相当有信心 coq 具有足够的表现力来捕捉模式,但我在弄清楚如何做到这一点时遇到了一些麻烦。我正在定义一种编程语言,它具有表示句法术语的相互递归
我有一个类型如下的函数: union :: a -> a -> a 和a有 加性 属性(property)。所以我们可以看成union作为 (+) 的一个版本 比如说,我们有 [a] ,并希望执行并行
我编写了以下代码,它创建了一个无限的斐波那契数列: fibs = 1:1:fib 1 1 where fib a b = a+b:fib b (a+b) 上面的代码可以用foldl写吗?或 fol
(rust noob here;我试图理解在高阶函数情况下什么可以/不能/应该/不应该通过引用传递) let a = [1, 2, 3]; 此调用有效: let sum = a.iter().fold
我对 Haskell 基础知识之一有疑问:Fold + 匿名函数 我正在使用 foldl 开发 bin2dec 程序。 解决方案如下所示: bin2dec :: String -> Int bin2d
让我们假设 fn scan(int, int) -> int。 使用时 fn count(x: int, y: int) -> int { scan(x - 1, y - 1) + scan(
我正在尝试实现一个通用的缺点列表,它比本书第 15 章中使用的列表更高级: use std::fmt::Debug; #[derive(Debug)] enum List { Nil,
我有一个网站,顶部有一个水平的 ul/li 按钮。如果有人缩小窗口,按钮会向下折叠成两排。有点丑。 举个例子: http://www.redolog.com 我想知道是否有一个布局指令说“看,你必须至
我正在使用下面的代码,改编自 this线。我能够获取文件夹中的文件列表,但最后出现段错误。知道为什么会这样吗? 有没有办法在for循环中获取当前文件(完整路径)的std::string? boost
问题如何以编程方式确定“折叠”(浏览器显示多少垂直内容)? “折叠”定义为您再也看不到/必须滚动的地方。 我曾尝试使用 JavaScript 来简单地确定浏览器窗口大小来确定折叠;不幸的是 - 这不能
人们普遍认为 1024x768 浏览器是目标,960 - 980 像素的宽度是可以接受的。 (我个人更喜欢 960 的 chrome,但没有争论的意义。) 我的问题是 - 通常可以假设用户的窗口高度是
我正忙于学习 F# 并在玩弄 Seq.fold。任何人都可以解释为什么以下两个调用本质上不相同,一个错误而另一个错误。 这样调用: Seq.fold (fun state input -> state
我相信这是有充分理由的,但我没有看到。 Fold在(说)List返回 the result of applying fold operator op between all the elements
我的家庭作业进行得非常顺利,直到我偶然发现了最后一项任务。 首先,我必须定义一个自定义 List结构体: data List a = Nil | Cons a (List a) deriving Sh
因此,有一种称为“折叠的通用属性”的东西,确切地说如下: g [] = i; g (x:xs) = f x (g xs) g = fold f i 但是,正如您现在可能的那样,有像 dropWhil
我是一名优秀的程序员,十分优秀!