- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我无法理解 kafka 流中的 groupBy/groupById 和窗口的概念。我的目标是聚合一段时间内(例如 5 秒)的流数据。我的流数据如下所示:
{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}
时间以毫秒(纪元)为单位。这里,我的时间戳位于我的消息中,而不是 key 中。我想平均 5 秒窗口的值。
这是我正在尝试的代码,但似乎我无法让它工作
builder.<String, String>stream("my_topic")
.map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
.groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
.windowedBy(TimeWindows.of(5000))
.count()
.toStream()
.foreach((key, val) -> System.out.println(key + " " + val));
即使主题每两秒生成一次消息,此代码也不会打印任何内容。当我按 Ctrl+C 时,它会打印类似的内容
[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1
这个输出对我来说没有意义。
相关代码:
public class MessageTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
String str = (String)record.value();
TimeVal tv = TimeVal.fromJson(str);
return tv.time;
}
}
public class TimeVal
{
final public long time;
final public double value;
public TimeVal(long tm, double val) {
this.time = tm;
this.value = val;
}
public static TimeVal fromJson(String val) {
Gson gson = new GsonBuilder().create();
TimeVal tv = gson.fromJson(val, TimeVal.class);
return tv;
}
}
问题:
为什么需要将序列化器/反序列化器传递给分组依据。有些重载也采用 ValueStore,那是什么?分组后,数据在分组流中看起来如何?
窗口流与组流有何关系?
以上,我期望以流式传输方式打印。也就是说每5秒缓冲一次,然后计数然后打印。它只打印一次在命令提示符下按 Ctrl+c,即打印然后退出
最佳答案
您的输入数据中似乎没有键(如果这是错误的,请纠正我),而且您似乎想要进行全局聚合?
一般来说,分组是将一个流分割成多个子流。这些子流是按 key 构建的(即每个 key 一个逻辑子流)。您将时间戳设置为代码片段中的键,从而为每个时间戳生成一个子流。我认为这不是有意的。
如果要进行全局聚合,则需要将所有记录映射到单个子流,即为groupBy()
中的所有记录分配相同的键。请注意,全局聚合不会扩展,因为聚合必须由单个线程计算。因此,这仅适用于小型工作负载。
将窗口化应用于每个生成的子流以构建窗口,并按窗口计算聚合。窗口是基于 Timestamp
提取器返回的时间戳构建的。看来您已经有一个实现可以为此目的提取值的时间戳。
This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like
默认情况下,Kafka Streams 使用一些内部缓存,并且缓存将在提交时刷新 - 默认情况下每 30 秒发生一次,或者当您停止应用程序时发生一次。您需要禁用缓存才能更早地看到结果(参见 https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html )
Why do you need to pass serializer/deserializer to group by.
因为数据需要重新分配,而这是通过 Kafka 中的主题进行的。请注意,Kafka Streams 是为分布式设置构建的,同一应用程序的多个实例并行运行以水平扩展。
顺便说一句:我们可能也会对这篇关于 Kafka Streams 执行模型的博客文章感兴趣:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
关于java - 了解Kafka流groupBy和window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51779405/
我开始在 Ethereum blockchain 上了解如何开发智能合约以及如何写 web-script用于与智能合约交互(购买、销售、统计......)我得出了该怎么做的结论。我想知道我是否正确理解
我正在 UIView 中使用 CATransform3DMakeRotation,并且我正在尝试进行 45º,变换就像向后放置一样: 这是我拥有的“代码”,但显然没有这样做。 CATransform3
我目前正在测试 WebRTC 的功能,但我有一些脑逻辑问题。 WebRTC 究竟是什么? 我只读了“STUN”、“P2P”和其他...但是在技术方面什么是正确的 WebRTC(见下一个) 我需要什么
我在看 DelayedInit在 Scala in Depth ... 注释是我对代码的理解。 下面的 trait 接受一个非严格计算的参数(由于 => ),并返回 Unit .它的行为类似于构造函数
谁能给我指出一个用图片和简单的代码片段解释 WCF 的资源。我厌倦了谷歌搜索并在所有搜索结果中找到相同的“ABC”文章。 最佳答案 WCF 是一项非常复杂的技术,在我看来,它的文档记录非常少。启动和运
我期待以下 GetArgs.hs打印出传递给它的参数。 import System.Environment main = do args main 3 4 3 :39:1: Coul
private int vbo; private int ibo; vbo = glGenBuffers(); ibo = glGenBuffers(); glBindBuffer(GL_ARRAY_
我正在尝试一个 for 循环。我添加了一个 if 语句以在循环达到 30 时停止循环。 我见过i <= 10将运行 11 次,因为循环在达到 10 次时仍会运行。 如果有设置 i 的 if 语句,为什
我正在尝试了解 WSGI 的功能并需要一些帮助。 到目前为止,我知道它是一种服务器和应用程序之间的中间件,用于将不同的应用程序框架(位于服务器端)与应用程序连接,前提是相关框架具有 WSGI 适配器。
我是 Javascript 的新手,我正在尝试绕过 while 循环。我了解它们的目的,我想我了解它们的工作原理,但我在使用它们时遇到了麻烦。 我希望 while 值自身重复,直到两个随机数相互匹配。
我刚刚偶然发现Fabric并且文档并没有真正说明它是如何工作的。 我有根据的猜测是您需要在客户端和服务器端都安装它。 Python 代码存储在客户端,并在命令运行时通过 Fabric 的有线协议(pr
我想了解 ConditionalWeakTable .和有什么区别 class ClassA { static readonly ConditionalWeakTable OtherClass
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 5年前关闭。 Improve this questi
我还没有成功找到任何可以引导我理解 UIPickerView 和 UIPickerView 模型的好例子。有什么建议吗? 最佳答案 为什么不使用默认的 Apple 文档示例?这是来自苹果文档的名为 U
我在看foldM为了获得关于如何使用它的直觉。 foldM :: Monad m => (a -> b -> m a) -> a -> [b] -> m a 在这个简单的例子中,我只返回 [Just
答案What are _mm_prefetch() locality hints?详细说明提示的含义。 我的问题是:我想要哪一个? 我正在处理一个被重复调用数十亿次的函数,其中包含一些 int 参数。
我一直在读这个article了解 gcroot 模板。我明白 gcroot provides handles into the garbage collected heap 然后 the handle
提供了一个用例: 流处理架构;事件进入 Kafka,然后由带有 MongoDB 接收器的作业进行处理。 数据库名称:myWebsite集合:用户 并且作业接收 users 集合中的 user 记录。
你好 我想更详细地了解 NFS 文件系统。我偶然发现了《NFS 图解》这本书,不幸的是它只能作为谷歌图书提供,所以有些页面丢失了。有人可能有另一个很好的资源,这将是在较低级别上了解 NFS 的良好开始
我无法理解这个问题,哪个更随机? rand() 或: rand() * rand() 我发现这是一个真正的脑筋急转弯,你能帮我吗? 编辑: 凭直觉,我知道数学答案是它们同样随机,但我忍不住认为,如果您
我是一名优秀的程序员,十分优秀!