gpt4 book ai didi

apache-kafka-streams - 具有抑制功能的 Kafka SessionWindow 仅在有稳定的输入记录流时才发送最终事件

转载 作者:行者123 更新时间:2023-12-03 19:15:27 35 4
gpt4 key购买 nike

如果没有恒定的输入记录流,则似乎带有宽限期和抑制的 session 窗口的 Kafka 流无法输出最终事件。

上下文:我们正在使用变更数据捕获 (CDC) 来监控对旧数据库的更改。当用户使用 UI 进行更改时,数据库事务将更改 1..n 个表。每个 SQL 语句都会产生一条 Kafka 记录。这些需要聚合以创建一个“触发记录”,用于启动昂贵的流程。该过程应在提交遗留数据库中的事务的一秒内启动。只有少数用户在使用旧应用程序,因此事务之间可能存在大量时间。

我们有一个 Kafka Stream 应用程序,它使用 session 窗口和 400 毫秒的不活动间隙来聚合共享相同键(事务 ID)的传入记录,并输出触发记录。

我们有一个可行的解决方案,但触发器记录只写入输出主题,只要其他事务正在运行,以便生成稳定的传入记录流。 即使没有进一步的输入记录,我们也需要关闭窗口并写入触发记录。

工作代码在这里:https://github.com/maxant/kafka-data-consistency/blob/714a44689fd48aa28b05b855638ac7ed50dd5ee9/partners/src/main/java/ch/maxant/kdc/partners/ThroughputTestStream.java#L65

以下是该代码的摘要:

      stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("throughput-test-aggregated");

最初我没有压制,也没有宽限期。
仅使用默认配置,我总是会收到包含所有聚合记录的窗口的最终事件,但在 400 毫秒窗口之后需要 6 秒,这对我们来说等待时间太长了。

为了减少延迟并加快速度,我将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 1,但这会在每次聚合后导致输出记录,而不仅仅是单个输出记录。

我引入了抑制(以及 0ms 的宽限期),以确保只创建一个输出记录。

现在的问题是我只收到一个输出记录,如果新的输入记录在窗口关闭后到达(不管它们的键)。

该测试创建了 10 个输入记录,它们都具有相同的键,相隔 10 毫秒,都在 100 毫秒内。然后它休息 3 秒,让我在一组十个记录后将其关闭。我希望收到一个输出记录,但没有一个输出记录到达,除非我让测试继续运行,以创建第二组输入记录。这个问题是可以重现的。

我已阅读以下文章,但找不到任何描述我所看到内容的内容,即只有在处理了其他记录(无论 key )后,最终记录才会发送到输出主题。
  • https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
  • https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html

  • 即使没有进一步的记录被处理,我必须更改什么才能将最终记录发送到我的输出主题?

    (在 Linux 上使用带有客户端和服务器的 Kafka 2.4.1)

    最佳答案

    更新 : 我的拓扑有错误,已修复

    使用抑制时,我遇到了与您完全相同的问题,这是预期的行为。因为suppress 仅支持使用流时间而不是挂钟时间来发送缓冲记录,所以如果您停止获取新记录,流时间将被卡住并且Suppress不会发出最后一个抑制的窗口。

    我使用的解决方案是使用处理器 API 编写自定义抑制(使用 Transfomer,以便您可以使用 DSL 向下游发送被抑制的记录),并将状态存储用作缓冲区,然后检查哪些窗口应该刷新(或发出)到下游每当有新记录进入或经过一段时间后(使用 WALL_CLOCK_TIME 标点符号),处理器。

    变压器看起来像这样:

    public class SuppressWindowTransformer implements Transformer<Windowed<String>, String, KeyValue<Windowed<String>, String>> {
    private ProcessorContext context;
    private Cancellable cancellable;
    private KeyValueStore<Windowed<String>, String> kvStore;
    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    kvStore = (KeyValueStore) context.getStateStore("suppressed_windowed_transaction");
    cancellable = context.schedule(Duration.ofMillis(100), PunctuationType.WALL_CLOCK_TIME, timestamp -> flushOldWindow());
    }

    @Override
    public KeyValue<Windowed<String>, String> transform(Windowed<String> key, String value) {
    kvStore.put(key, value);//buffer (or suppress) the new in coming window
    flushOldWindow();
    return null;
    }

    private void flushOldWindow() {
    //your logic to check for old windows in kvStore then flush

    //forward (or unsuppressed) your suppressed records downstream using ProcessorContext.forward(key, value)
    }

    @Override
    public void close() {
    cancellable.cancel();//cancel punctuate
    }
    }

    在您的 Stream DSL 中:
    stream.groupByKey()
    .windowedBy(SessionWindows.with(Duration.ofMillis(400)).grace(Duration.ofMillis(0)))
    .aggregate(...)//remove suppress operator and write custom suppress using processor API
    .toStream()
    .transform(SuppressWindowTransformer::new, "suppressed_windowed_transaction")
    .to("throughput-test-aggregated");

    关于apache-kafka-streams - 具有抑制功能的 Kafka SessionWindow 仅在有稳定的输入记录流时才发送最终事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60822669/

    35 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com