gpt4 book ai didi

java - 如何更改Kafka拓扑的消费者偏移量?

转载 作者:行者123 更新时间:2023-12-02 11:51:54 26 4
gpt4 key购买 nike

现在,我正在开发一个使用 Kafka Streams 处理消息的项目。我们的消息由两个标识符组成,其中一个用于用户标识符,第二个用于用户所属消息列表的标识符。

该应用程序的故事是,客户希望通过我的应用程序向其用户列表发送推送消息。因此,客户端在向用户发送推送消息之前从外部源加载用户列表,然后开始发送。但客户端也希望随时停止正在进行的发送。

此时我的问题出现了。我想停止发送推送消息,而不通过其流 API 消耗来自 kafka 的所有消息。我用谷歌搜索将流源的偏移量更改为最新,但找不到正确的方法来完成它。

我怎样才能实现它?谢谢。

最佳答案

Kafka Streams 不支持此功能。如果您确实需要这样做,您可以停止 Kafka Streams 并使用 bin/kafka-consumer-groups.sh 手动查找 Kafka Streams 应用程序(注意:application.id = = 组.id)。之后您可以重新启动 Kafka Streams。

顺便说一句:在即将发布的 1.1 版本中,bin/kafka-streams-application-reset.sh 还将允许操作偏移量。我希望我们还能够设法使内部使用的类 StreamsResetter 成为公共(public) API——这将允许您在我们的应用程序中以编程方式执行此操作,而无需退回到命令行。

关于java - 如何更改Kafka拓扑的消费者偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47827722/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com