gpt4 book ai didi

apache-kafka - 如何处理 Kafka Connect Sink 中的背压?

转载 作者:行者123 更新时间:2023-12-03 14:29:11 25 4
gpt4 key购买 nike

我们构建了一个自定义的 Kafka Connect 接收器,该接收器又调用远程 REST API。如何将背压传播到 Kafka Connect 基础架构,以便在远程系统比内部消费者向 put() 传递消息的速度慢的情况下调用 put() 的频率降低?
Kafka 连接文档说我们不应该在 put() 中阻塞,而是在 flush() 中阻塞。但是在 put() 中不阻塞意味着我们必须缓冲数据,这肯定会在某些时候导致 OOM 异常,如果 put() 比 flush() 更频繁地调用。
我已经看到允许 kafka 消费者调用 pause() 或在 loop() 中阻塞。是否可以在 kafka 连接接收器中利用它?

最佳答案

I've seen that a kafka consumer is allowed to call pause() or block in the loop(). Is it possible to leverage this in a kafka connect sink?



原始消费者没有暴露,所以没有。您可以调用 /pause在整个连接器上,尽管我不确定当时未刷新的消息会发生什么。

But not blocking in put() means that we have to buffer data which surely leads to OOM exceptions at some point



当然可以,但这确实是保持数据超过必要时间的唯一可行选择。例如,这就是 S3 和 HDFS 连接器的工作方式。

rotate.interval.ms
The time interval in milliseconds to invoke file commits...



您的 HTTP 客户端连接可能无论如何都会阻止发出请求,不是吗?

另一种方法是让你的 HTTP 服务器嵌入一个 Kafka 消费者,这样它就可以自己轮询消息并在本地对它们进行操作,而不需要通过 HTTP 发送请求。

关于apache-kafka - 如何处理 Kafka Connect Sink 中的背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49913994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com