- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们有一个数据流,其中每个元素都是这种类型:
id: String
type: Type
amount: Integer
我们希望聚合此流并每周输出一次 amount
的总和。
当前解决方案:
一个示例 flink 管道如下所示:
stream.keyBy(type)
.window(TumblingProcessingTimeWindows.of(Time.days(7)))
.reduce(sumAmount())
.addSink(someOutput())
输入
| id | type | amount |
| 1 | CAT | 10 |
| 2 | DOG | 20 |
| 3 | CAT | 5 |
| 4 | DOG | 15 |
| 5 | DOG | 50 |
如果窗口在记录 3
和 4
之间结束,我们的输出将是:
| TYPE | sumAmount |
| CAT | 15 | (id 1 and id 3 added together)
| DOG | 20 | (only id 2 as been 'summed')
Id 4
和 5
仍然在 flink 管道中,将在下周输出。
因此下周我们的总产量将是:
| TYPE | sumAmount |
| CAT | 15 | (of last week)
| DOG | 20 | (of last week)
| DOG | 65 | (id 4 and id 5 added together)
新要求:
我们现在还想知道每条记录是在哪一周处理的。换句话说,我们的新输出应该是:
| TYPE | sumAmount | weekNumber |
| CAT | 15 | 1 |
| DOG | 20 | 1 |
| DOG | 65 | 2 |
但我们还想要这样的额外输出:
| id | weekNumber |
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| 4 | 2 |
| 5 | 2 |
如何处理?
flink有什么办法可以实现吗?我想我们会有一个聚合函数,它可以对金额求和,但也可以输出每条记录以及当前周数,但我在文档中找不到执行此操作的方法。
(注意:我们每周处理大约 1 亿条记录,因此理想情况下我们只想在一周内将聚合保持在 flink 的状态,而不是所有单独的记录)
编辑:
我选择了下面 Anton 描述的解决方案:
DataStream<Element> elements =
stream.keyBy(type)
.process(myKeyedProcessFunction());
elements.addSink(outputElements());
elements.getSideOutput(outputTag)
.addSink(outputAggregates())
KeyedProcessFunction 看起来像这样:
class MyKeyedProcessFunction extends KeyedProcessFunction<Type, Element, Element>
private ValueState<ZonedDateTime> state;
private ValueState<Integer> sum;
public void processElement(Element e, Context c, Collector<Element> out) {
if (state.value() == null) {
state.update(ZonedDateTime.now());
sum.update(0);
c.timerService().registerProcessingTimeTimer(nowPlus7Days);
}
element.addAggregationId(state.value());
sum.update(sum.value() + element.getAmount());
}
public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
state.update(null);
c.output(outputTag, sum.value());
}
}
最佳答案
reduce 方法有一个变体,它将 ProcessWindowFunction 作为第二个参数。你会像这样使用它:
stream.keyBy(type)
.window(TumblingProcessingTimeWindows.of(Time.days(7)))
.reduce(sumAmount(), new WrapWithWeek())
.addSink(someOutput())
private static class WrapWithWeek
extends ProcessWindowFunction<Event, Tuple3<Type, Long, Long>, Type, TimeWindow> {
public void process(Type key,
Context context,
Iterable<Event> reducedEvents,
Collector<Tuple3<Type, Long, Long>> out) {
Long sum = reducedEvents.iterator().next();
out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
}
}
通常情况下,ProcessWindowFunction 会传递一个 Iterable,其中包含窗口收集的所有事件,但如果您使用 reduce 或聚合函数来预聚合窗口结果,则只有该单个值会传递到 Iterable。这方面的文档是 here但文档中的示例目前有一个小错误,我已在此处的示例中修复了该错误。
但鉴于对第二个输出的新要求,我建议您放弃使用 Windows 执行此操作的想法,而是使用键控 ProcessFunction .您将需要两个每个键的 ValueState:一个按周计数,另一个用于存储总和。您需要一个每周触发一次的计时器:当它触发时,它应该发出类型、总和和周数,然后递增周数。同时,流程元素方法将简单地输出每个传入事件的 ID 以及周计数器的值。
关于apache-flink - Flink 窗口 : aggregate and output to sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51990582/
我想用 Future 做一个异步工作。 但是下面的 .sink() 闭包 永远不会被调用。 看起来 Future 的实例在它被调用后就被释放了。 Future { promise in
我在我的 aspnet core 应用程序中使用 Serilog 进行日志记录。我需要频繁地将日志事件写入控制台(每秒 300-500 个事件)。我在 docker 容器内运行我的应用程序,并使用 O
为了查看在 foreach() 中运行的函数输出的控制台消息循环我遵循了 this guy 的建议并添加了一个 sink()像这样调用: library(foreach) libra
我正在使用 kafka connect 从 Mongodb -> Elasticsearch 移动数据。 目前,更新的记录作为新文档插入到 Elasticsearch 索引中。但是,我想根据 ID 更
我在 R 中处理以下数据: > str(df) 'data.frame': 369269 obs. of 12 variables: $ bkod : int 110006 110006 1
我正在使用 Tokio 玩异步/等待功能,并在我的 Cargo.toml 中启用了异步/等待功能。 (以及 2018 年版的最新 Rust nightly): tokio = { version =
这个问题在这里已经有了答案: URLSession.shared.dataTaskPublisher not working on IOS 13.3 (3 个回答) 1年前关闭。 这是我的管道: UR
我正在尝试使用flume来使用Twitter Stream API并将该tweet索引到我的elasticsearch中。我将flume.conf设置为使用com.cloudera.flume.sou
我正在尝试将数据从 kafka(最终我们将使用在不同实例上运行的 kafka)发送到 hdfs。我认为 Flume 或某种摄取协议(protocol)对于将数据导入 hdfs 是必要的。所以我们使用c
我怎样才能将输出重定向到某个 txt 文件,但以这样一种方式,以便我可以在逐步生成的同时在控制台中同时看到该输出? 最佳答案 只需使用 sink 的 split=TRUE 参数: sink(file=
我一直在疯狂地尝试让这一切发生,但我就是想不通(初学者)。 正如你所看到的,当你向下滚动时,顶部的头部部分会粘在页面的顶部,但也会溢出一点。这是用 stickyjs 完成的。我也想对头部底部做同样的事
我一直在使用 Serilog.Sinks.Email,我注意到(除了我没有收到电子邮件这一事实),当我执行 Log 语句时没有异常或任何表明失败的信息.即使我为 MailServer 放入垃圾邮件,L
两者的输出 pactl list sink-inputs和 pacmd list-sink-inputs包含一个属性部分: Properties: media.name = "ALSA Pla
我有一个函数f :: ByteString -> String ,并且需要一个 Sink ByteString (ResourceT IO) . 我怎么得到这个? 不幸的是,这些文档不是很有帮助...
我将 RxSwift 与以下类似的东西一起使用 extension Reactive where Base: UIViewController { public var showError:
如何在 Python Spark 结构化流中使用 foreach 来触发输出操作。 query = wordCounts\ .writeStream\ .outputMode('upd
我正在尝试将 R 脚本的错误和警告记录到外部文件中。同时我希望能够在 RStudio 的控制台中看到错误和警告(对开发和调试有用)。我正在尝试使用以下代码: logfile <- file("my_f
我正在使用 R 的 sink() 函数将错误、警告、消息和控制台输出捕获到单个文本文件中。 我想知道是否同时沉没两个 留言 和 输出 类型到单个打开的文件是不好的吗? 我将上述所有内容捕获到一个文件中
我正在测试Serilog.Sinks.Elasticsearch库和ELK堆栈(Elasticsearch&Kibana)的组合,以从我的ASP.NET Core 2.2应用程序中收集日志。 Web应
我基本上是从 Kafka 源读取数据,并将每条消息转储到我的 foreach 处理器(感谢 Jacek 页面提供的简单示例)。 如果这确实有效,我实际上应该在此处的 process 方法中执行一些业务
我是一名优秀的程序员,十分优秀!