gpt4 book ai didi

apache-kafka - KTable 状态存储无限保留

转载 作者:行者123 更新时间:2023-12-04 11:47:32 26 4
gpt4 key购买 nike

我们有以下高级 DSL 处理拓扑:

TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")

简而言之,我们上面所做的是:
  • 使用跳跃窗口计数事件
  • 在结果 KTable 之间进行左连接(由于业务逻辑而离开)
  • 对键和值进行分组和更改:取键的一个组件(Long)并移动到值
  • 将生成的 KTable 聚合为最终的 KTable,聚合对象是从 T 到步骤 1 中连接的两个计数器的映射。请注意,映射的大小不超过 600,通常要小得多。

  • 这个想法是创建窗口事件计数并使用这些窗口键进行连接和聚合操作(在 KTable 的情况下,此类操作没有窗口)

    问题是这样的:
    join和聚合操作的状态存储没有保留机制,导致磁盘(RocksDB)空间爆炸。

    进一步来说:
    (跳跃)窗口会导致键上的笛卡尔积,并且没有删除旧窗口的机制。

    如果 KTable 键没有被窗口化,而只是足够多的唯一键 也会出现同样的问题。

    请注意,支持 table1 和 table2 的状态存储没有空间问题,这是因为管理删除旧窗口的 DSL 为它们提供了一个窗口化存储。
    在连接和聚合中,我们将窗口键视为“任何旧键”,DSL 也这样做并使用非窗口键值存储。

    这个问题与以下内容有关: KAFKA-4212 , KAFKA-4273 , confluent forum question

    这里是否有任何误解的概念?
    有没有一种使用 DSL 实现这种拓扑的简单方法?
    如果没有,使用低级 API 实现它的建议方法是什么?

    最佳答案

    我认为你可以做这样的事情:

    StreamsBuilder builder = new StreamBuilder();
    KStream<K,V> streams = builder.stream(/* pattern for both streams */);

    KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
    /* custom Transformer that set the weaker grouping key right here
    and puts the extracted component into the value before the aggregation;
    additionally (that's why we need a Transformer) get the topic name from
    context object and enrich the value accordingly (ie, third String argument in the output Tuple */);

    KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
    timeWindow,
    /* initializer: return an empty Map;
    aggregator:
    for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
    if not, add new map entry with Pair(0,0)
    take the corresponding Pair from the Map and increase one
    counter depending on the original topic that
    is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);

    示例 :

    假设有两个输入流 s1s2具有以下记录( <TS,key,value> ):
    s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
    s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

    在您的原始程序中,您将首先分别计算两个流(假设大小为 5 的滚动窗口)得到(省略 TS):
    <W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>  
    and
    <W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>

    左连接后你得到(处理所有记录后的结果,省略中间体):
    <<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>

    现在您使用“较弱的键”重新分组,将键部分提取到值中,并将所有条目放入映射中,基于提取的键部分。假设我们根据“字符”和“数字”拆分键(即, k1 被拆分为 k,因为 smallerKey1 是提取的 Long 进入值)。聚合后你得到(我将 map 表示为 (k1 -> v1, k2 - v2) :
    <<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>

    如果这是一个正确的例子(我可能没有理解你的问题描述)。您可以使用上面描述的 transform/groupBy/aggregate 来做同样的事情。输入是:
    s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
    s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>
    transform的结果是(包括 TS):
    <1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
    and
    <1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>

    Note, that Transform actually processes both streams as "one stream" because we used Pattern subscription -- thus, the output is just one stream with interleaving records from both original streams.



    您现在对聚合结果应用相同的窗口( TS 省略)——我们通过交替处理每个原始输入流的一条记录来显示结果)为 inputRecord ==> outputRecord
    <1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
    <1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
    <2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
    <2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
    <3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
    <3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
    <6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
    <11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
    <12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>

    如果将此结果的每个键的最新记录与上面的结果进行比较,您会发现两者是相同的。

    关于apache-kafka - KTable 状态存储无限保留,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47439855/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com