gpt4 book ai didi

java - Kafka流聚合: How to ignore intermediate aggregation results for a Window

转载 作者:行者123 更新时间:2023-12-01 17:50:04 24 4
gpt4 key购买 nike

我们正在使用kafka-stream与时间窗口聚合以计算事件的最终总和。我们已经实现了我们的要求,但中间聚合结果存在问题。根据Kafka内存管理文档(https://kafka.apache.org/11/documentation/streams/developer-guide/memory-mgmt.html),似乎没有办法丢弃这些影响最终结果的中间结果。请考虑以下摘自上述文档的解释。

Use the following example to understand the behaviors with and without record caching. In this example, the input is a KStream<String,Integer> with the records <K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>. The focus in this example is on the records with key == A.

An aggregation computes the sum of record values, grouped by key, for the input and returns a KTable<String, Integer>.

Without caching: a sequence of output records is emitted for key A that represent changes in the resulting aggregation table. The parentheses (()) denote changes, the left number is the new aggregate value and the right number is the old aggregate value: <A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>.

With caching: a single output record is emitted for key A that would likely be compacted in the cache, leading to a single output record of <A, (321, null)>. This record is written to the aggregation’s internal state store and forwarded to any downstream operations.

The cache size is specified through the cache.max.bytes.buffering parameter, which is a global setting per processing topology:

根据文档,在不缓存输出记录的情况下使用聚合时会产生增量结果。 (我们注意到,即使有缓存,有时也会发生这种情况)。我们的问题是我们有其他应用程序对这些输出聚合起作用并进行一些计算。因此,当输出具有中间聚合时,这些其他计算就会出错。例如,当我们有 <A (21,1)> 时,我们可能会开始计算其他内容事件(正确的计算应该在<A (321, null)>那个时间窗口上完成。

我们的要求是仅在该窗口上的最终聚合上进行其他计算。我们有以下关于kafka流聚合的问题

  1. 当kakfa输出中间结果时,这些输出不是已经聚合了数据吗?例如,考虑输出 <A, (1, null)>, <A, (21, 1)>, <A, (321, 21)> 。这里是第二个输出事件 <A, (21, 1)>是第三个输出 <A, (321, 21)>已经累计值(value)。这是正确的吗?
  2. 有没有办法识别窗口的中间结果?

最佳答案

要记住的另一件事是提交时间间隔缓存大小控制着结果向下游转发的时间。

例如,如果您的提交间隔为 10 秒,则意味着无论缓存是否已满,缓存中的结果都会被转发(如果启用了日志记录,则写入更改日志主题)。

因此,如果您可以将内存设置得足够高以支持将提交间隔设置为所需的窗口时间,则您可能能够近似得到单个最终结果。当然,这是一种粗粒度的方法,会影响整个拓扑,因此您需要考虑并可能构建一个示例应用程序原型(prototype),看看这种方法是否适合您。

关于java - Kafka流聚合: How to ignore intermediate aggregation results for a Window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51298162/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com