gpt4 book ai didi

java - 了解Kafka流groupBy和window

转载 作者:行者123 更新时间:2023-12-02 05:32:54 26 4
gpt4 key购买 nike

我无法理解 kafka 流中的 groupBy/groupById 和窗口的概念。我的目标是聚合一段时间内(例如 5 秒)的流数据。我的流数据如下所示:

{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}

时间以毫秒(纪元)为单位。这里,我的时间戳位于我的消息中,而不是 key 中。我想平均 5 秒窗口的值。

这是我正在尝试的代码,但似乎我无法让它工作

builder.<String, String>stream("my_topic")
.map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
.groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
.windowedBy(TimeWindows.of(5000))
.count()
.toStream()
.foreach((key, val) -> System.out.println(key + " " + val));

即使主题每两秒生成一次消息,此代码也不会打印任何内容。当我按 Ctrl+C 时,它会打印类似的内容

[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1

这个输出对我来说没有意义。

相关代码:

public class MessageTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
String str = (String)record.value();
TimeVal tv = TimeVal.fromJson(str);
return tv.time;
}
}

public class TimeVal
{
final public long time;
final public double value;
public TimeVal(long tm, double val) {
this.time = tm;
this.value = val;
}
public static TimeVal fromJson(String val) {
Gson gson = new GsonBuilder().create();
TimeVal tv = gson.fromJson(val, TimeVal.class);
return tv;
}
}

问题:

为什么需要将序列化器/反序列化器传递给分组依据。有些重载也采用 ValueStore,那是什么?分组后,数据在分组流中看起来如何?

窗口流与组流有何关系?

以上,我期望以流式传输方式打印。也就是说每5秒缓冲一次,然后计数然后打印。它只打印一次在命令提示符下按 Ctrl+c,即打印然后退出

最佳答案

您的输入数据中似乎没有键(如果这是错误的,请纠正我),而且您似乎想要进行全局聚合?

一般来说,分组是将一个流分割成多个子流。这些子流是按 key 构建的(即每个 key 一个逻辑子流)。您将时间戳设置为代码片段中的键,从而为每个时间戳生成一个子流。我认为这不是有意的。

如果要进行全局聚合,则需要将所有记录映射到单个子流,即为groupBy()中的所有记录分配相同的键。请注意,全局聚合不会扩展,因为聚合必须由单个线程计算。因此,这仅适用于小型工作负载。

将窗口化应用于每个生成的子流以构建窗口,并按窗口计算聚合。窗口是基于 Timestamp 提取器返回的时间戳构建的。看来您已经有一个实现可以为此目的提取值的时间戳。

This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like

默认情况下,Kafka Streams 使用一些内部缓存,并且缓存将在提交时刷新 - 默认情况下每 30 秒发生一次,或者当您停止应用程序时发生一次。您需要禁用缓存才能更早地看到结果(参见 https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html )

Why do you need to pass serializer/deserializer to group by.

因为数据需要重新分配,而这是通过 Kafka 中的主题进行的。请注意,Kafka Streams 是为分布式设置构建的,同一应用程序的多个实例并行运行以水平扩展。

顺便说一句:我们可能也会对这篇关于 Kafka Streams 执行模型的博客文章感兴趣:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

关于java - 了解Kafka流groupBy和window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51779405/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com