gpt4 book ai didi

apache-kafka - Kafka Streams – 在同一主题上获取 KTable 和 KStream 的最佳方式?

转载 作者:行者123 更新时间:2023-12-04 18:27:18 26 4
gpt4 key购买 nike

我对 Kafka Streams (0.10.1.1) 有疑问。我正在尝试创建一个 KStreamKTable在同一个话题上。

我尝试的第一种方法很简单,调用 KStreamBuilder同一主题的流和表的方法。这导致

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

好的,这似乎是 Kafka Streams 内置的一些限制。

我的第二种方法最初是创建一个 KTable并使用 toStream()方法就可以了。这有 KTables 的问题。做一些内部缓冲/刷新,所以如果一个键出现多次,输出流不会反射(reflect)所有输入元素,就像我的例子一样。这是一个问题,因为我正在计算 key 的出现次数。

似乎可行的方法是最初创建一个 KStream ,按键分组,然后通过丢弃旧聚合并仅保留新值来“减少”它。我对这种方法不太满意,因为 a) 它看起来很复杂 b) Reducer接口(interface)没有指定哪一个是已经聚合的值,哪一个是新的。我按照惯例去了,保留了第二个,但是……嗯。

所以问题归结为:有更好的方法吗?我错过了一些非常明显的东西吗?

请记住,我没有在处理正确的用例——这只是我了解 Streams-API。

最佳答案

关于添加主题两次:这是不可能的,因为 Kafka Streams 应用程序是单个“消费者组”,因此只能提交一次主题的偏移量,而添加主题两次将表明该主题获得两次消费者(并且独立进步)。

对于方法KTable#toStream() ,您可以通过 StreamsConfig 禁用缓存参数cache.max.bytes.buffering == 0 .但是,这是一个全局设置,会禁用所有 KTable 的缓存/重复数据删除。 s(参见 http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。

Update: Since Kafka 0.11 it's possible to disable caching for each KTable individually via Materialized parameter.


groupBy方法也有效,即使它需要一些样板。我们正在考虑添加 KStream#toTable()到 API 以简化此转换。是的, reduce 中的第二个参数是新值——因为reduce是用于组合两个值,API没有“旧”和"new"的概念,因此参数没有这样的命名。

关于apache-kafka - Kafka Streams – 在同一主题上获取 KTable 和 KStream 的最佳方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42306086/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com