- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将 Apache Flink 教程中的维基百科编辑流分析重写为 Scala https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html
教程中的代码是
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
下面是我在 scala 中的尝试
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = env.addSource(new WikipediaEditsSource());
val keyedEdits = edits.keyBy(event => event.getUser)
val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
(we.getUser, t._2 + we.getByteDiff))
}
这或多或少是到 scala 的字对字转换,基于此 val 结果
的类型应该是 DataStream[(String, Long)]
但是fold()
之后推断出的实际类型相差甚远。
请帮助确定 scala 代码有什么问题
EDIT1:使用 fold[R]
的柯里化(Currying)原理图进行了以下更改,现在类型确认为预期类型,但还是没找到原因
val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))
val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
(we.getUser, t._2 + we.getByteDiff))
最佳答案
问题似乎出在折叠上,您必须在累加器初始值之后有一个右括号。当您修复该问题时,代码将无法编译,因为它没有可用于 WikipediaEditEvent 的 TypeInformation。解决这个问题最简单的方法是导入更多的 flink scala API。请参阅下面的完整示例:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val userEditsVolume: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
userEditsVolume.print()
see.execute("Wikipedia User Edit Volume")
}
关于java - Apache flink 维基百科使用 Scala 编辑分析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41383878/
好吧,我看过一些帖子提到其他一些关于不使用 SP wiki 的帖子,因为它们很糟糕。 既然我们正在考虑在 SP 中创建 wiki,我需要知道为什么我们不应该让 6 名自动化开发人员来记录各种自动化流程
在 GitLab Wiki 部分,可以查看保存更改的历史记录。但是,当您单击提交链接时,它将显示该保存中存在的整个文件。有谁知道一种方法来区分提交以仅获取两个提交之间的差异? 这类似于它在 merge
我使用了 Wiki API 文档中的一些示例代码,但是当我输入搜索项时,没有任何反应。控制台中没有错误,什么也没有。如果我将 URL 输入到浏览器中,URL 本身就会起作用,所以我认为代码中的某些内容
我想在我的 wiki 中创建一个层次结构,如下所示: General FooPages Foo1 Foo2 Foo3 ODP Bar Baz 我想创建这些页
我正在尝试使用为 Python 制作的 Wikimapia 的 pymapia API,但无法理解如何正确使用它。 import pymapia as PyMapia a = PyMapia.PyMa
我正在开发适用于 iOS 的客户端应用程序,用于在 Mac OS X 服务器(Snow Leopard 和 Lion)上编辑内置的 Wiki/Blog。 看来我们可以使用 MetaWeblog 、At
我正在编写一些 URL 重写软件,我想从多个角度了解哪种 URL 方案更可取: 博客风格:my-chemistry-answer -- 为什么? -- (不可取,技术性) Wiki 风格:My_Che
我一直试图找到一种方法来在 Azure DevOps Wiki 中创建子页面的目录。我从其他 wiki 服务中找到了方法。 在 Confluence 中,他们有一个用于“ child 显示”的宏 我为
我是一名优秀的程序员,十分优秀!