gpt4 book ai didi

apache-kafka - 为什么这个 KStream/KTable 拓扑会传播没有通过过滤器的记录?

转载 作者:行者123 更新时间:2023-12-04 12:11:19 25 4
gpt4 key购买 nike

我有以下拓扑:

  • 创建状态存储
  • 根据 SOME_CONDITION 过滤记录,将其值映射到新实体,最后将这些记录发布到另一个主题 STATIONS_LOW_CAPACITY_TOPIC

  • 但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个:
    �   null
    � null
    � null
    � {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
    � {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
    � null

    也就是说,就好像它也将那些没有通过过滤器的记录发布到 STATIONS_LOW_CAPACITY_TOPIC 主题。这怎么可能?我怎样才能防止它们被发布?

    这是 ksteams 代码:
    kStream.groupByKey().reduce({ _, newValue -> newValue },
    Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
    .withKeySerde(Serdes.Integer())
    .withValueSerde(stationSerde))
    .filter { _, value -> SOME_CONDITION }
    .mapValues { station ->
    Stats(XXX)
    }
    .toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

    更新:我已经简化了拓扑并打印了结果表。出于某种原因,最终的 KTable 还包含与未通过过滤器的上游记录相对应的空值记录:
    kStream.groupByKey().reduce({ _, newValue -> newValue },
    Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
    .withKeySerde(Serdes.Integer())
    .withValueSerde(stationSerde))
    .filter { _, value ->
    val conditionResult = (SOME_CONDITION)
    println(conditionResult)
    conditionResult
    }
    .print()

    日志:
    false
    [KTABLE-FILTER-0000000002]: 1, (null<-null)
    false
    [KTABLE-FILTER-0000000002]: 2, (null<-null)
    false
    [KTABLE-FILTER-0000000002]: 3, (null<-null)
    false
    [KTABLE-FILTER-0000000002]: 4, (null<-null)
    true
    [KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)

    最佳答案

    答案在 KTable.filter(...) 的 javadoc 中:

    Note that filter for a changelog stream works different to record stream filters, because records with null values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record is forwarded.



    这解释了为什么我看到向下游发送的空值(墓碑)记录。

    为了避免它,我将 KTable 转换为 KStream,然后应用过滤器:
    kStream.groupByKey().reduce({ _, newValue -> newValue },
    Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
    .withKeySerde(Serdes.Integer())
    .withValueSerde(stationSerde))
    .toStream()
    .filter { _, value -> SOME_CONDITION }
    .mapValues { station ->
    StationStats(station.id, station.latitude, station.longitude, ...)
    }
    .to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))

    结果:
    4   {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
    5 {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
    ...

    关于apache-kafka - 为什么这个 KStream/KTable 拓扑会传播没有通过过滤器的记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52086694/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com