gpt4 book ai didi

java - Kafka - 如何同时使用 filter 和 filternot?

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:04:38 24 4
gpt4 key购买 nike

我有一个从主题获取数据的 Kafka 流,需要将该信息过滤到两个不同的主题。

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大这是否会对性能产生任何影响。有没有办法只过滤一次并推送到两个主题?

最佳答案

您的方法是正确的,数据未从主题中读取两次,并且没有内部数据复制正在进行。您的方法的唯一缺点是,两个过滤器谓词都会针对每条记录进行评估——但是,这是非常便宜的,不应该成为性能问题。

但是,您仍然可以通过使用 KStream#branch() 来提高性能,它确实采用多个谓词并依次评估所有谓词并为每个谓词返回一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不再为该单个记录评估进一步的谓词——这确保每个记录被添加到最大一个输出流;或者如果没有谓词匹配)。

因此,您可以只向 branch() 提供两个谓词:第一个与原始 filter() 谓词相同,第二个谓词始终返回

KStream<String, Model> stream = builder.stream(
Serdes.String(),
specificAvroSerde,
"not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
(key, value) -> new Processor().test(key,value),
(key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

但不确定此代码是否比您的原始版本更易读。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。

我添加的版本应该稍微提高 CPU 效率,因为对于所有满足谓词的记录,它只被评估一次。对于所有不满足结果的记录,将返回一个简单的 true(即,没有第二个谓词评估)。

如果您知道大多数记录将在 splitStream[1] 中结束,您还可以反转谓词(并使用 splitStream[0] 作为“bad-stream ") 以减少对第二个 true 返回谓词的调用次数。但这些只是微优化,应该无关紧要。

关于java - Kafka - 如何同时使用 filter 和 filternot?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40918158/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com