gpt4 book ai didi

apache-kafka-streams - KStream 和 KTable 之间的时间语义

转载 作者:行者123 更新时间:2023-12-03 16:50:47 25 4
gpt4 key购买 nike

我正在尝试构建以下拓扑:

  • 使用 Debezium 连接器,我拉了 2 个表(我们称它们为表 A 和 DA)。根据 DBZ,存储表行的主题具有结构 { before: "...", after: "..."}。
  • 我的拓扑的第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下:

  • private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
    StreamsBuilder builder, Properties streamsConfig) {
    return builder
    .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
    .filter((key, envelope) -> [ some filtering condition ] )
    .map((key, envelope) -> [ maps to TABLE_A.Value ] )
    .through(tableRowByIdTopicName);
    }
  • 请注意,我明确分配了记录时间,因为表行在最初发布后将被 CDC 标记为“年”。该函数目前正在做的是伪造从 2010-01-01 开始的时间,并使用 AtomicInteger ,为每个消耗的实体增加 1 毫秒。它对表 A 执行此操作,但对 DA 不执行此操作(稍后我将解释原因)。
  • 拓扑的第 2 阶段是基于表 A 的“已清理”主题构建 1 个 KTable,如下所示:

  • private static KTable<String, EntityInfoList> getEntityInfoListById(
    KStream<String, TABLE_A.Value> tableAByIdStream) {
    return tableAByIdStream
    .map((key, value) -> [ some mapping ] )
    .groupByKey()
    .aggregate(() -> [ builds up a EntityInfoList object ] ));
    }
  • 最后,在准备好 KTable 后,我将通过 DA 加入 KStream,如下所示:

  • private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
    KStream<String, Table_DA.Value> tableDAStream,
    KTable<String, EntityInfoList> tableA_KTable) {

    KStream<String, Table_DA>[] branches = tableDAStream.branch(
    (key, value) -> [ some logic ],
    (key, value) -> true);

    KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
    .join(
    tableA_KTable,
    (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
    .flatMap((key, listValue) -> [ some logic to flatten it ]));

    [ similar logic with branch[1] ]
    }

    我的问题是,尽管我“伪造”了来自 ​​Table_A 主题的记录的时间(我已经验证它们使用 kafkacat 引用了 2010/01/01)和 Table_DA 中的条目(连接的流端) ) 在今天 '2019/08/14' 附近有时间戳),在将 Table_A 中的所有记录摄取到 KTable 之前,Kafka Streams 似乎不会从 Table_DA KStream 中读取任何条目。

    因此,我没有预期的所有“加入命中”,而且它也是不确定的。我基于这句话的理解 What are the differences between KTable vs GlobalKTable and leftJoin() vs outerJoin()?正好相反:

    For stream-table join, Kafka Stream align record processing ordered based on record timestamps. Thus, the update to the table are aligned with the records of you stream.



    到目前为止,我的经验是这不会发生。我还可以很容易地看到我的应用程序在消耗了 Table_DA 流中的所有条目后如何继续通过 Table_A 主题方式搅动(它恰好小了 10 倍)。

    难道我做错了什么?

    最佳答案

    时间戳同步是 2.1.0 发布之前的最大努力(参见 https://issues.apache.org/jira/browse/KAFKA-3514)。

    从 2.1.0 开始,时间戳是严格同步的。但是,如果一个输入没有任何数据,Kafka Streams 将“强制”处理,如 KIP-353 中所述。以避免永远阻塞。如果您有突发输入并且想在一个输入没有数据的情况下“阻止”处理一段时间,您可以增加配置参数max.task.idle.ms (默认为 0 ),通过 KIP-353 在 2.1.0 中引入。

    关于apache-kafka-streams - KStream 和 KTable 之间的时间语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57498201/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com