gpt4 book ai didi

asynchronous - 具有异步 goroutines 的 Kafka 消费者

转载 作者:数据小太阳 更新时间:2023-10-29 03:29:18 27 4
gpt4 key购买 nike

我正在为我的消费者使用 sarama ( https://github.com/Shopify/sarama/ ) 和 Kafka 0.8.0。这是我的代码的样子:

consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
break consumerLoop
panic(event.Err)
}
<-c.sem
go c.processJob(event.Value)
}
}

我正在使用缓冲 channel (c.sem) 来控制一次可以运行多少个 processJob goroutine。这就是我控制消费者的并发/速度的方式。

我在使用这种方法时遇到的问题是,如果我需要更改并发性,我必须关闭使用者并重新启动它( channel 缓冲区大小是一个命令行标志)。我记录了已处理的偏移量,我必须查看我的日志以确定处理了哪些偏移量以及我希望消费者从哪里恢复。我想要一种更免提的方法来管理这些偏移量。

我已经在 consumer.properties 中将 autocommit.enabled 设置为 true,但我在 zookeeper 中没有看到任何变化。我认为这是因为当前的 Kafka 协议(protocol)不支持偏移 API:https://issues.apache.org/jira/browse/KAFKA-993

我可以尝试在处理完作业后手动将偏移量存储在 zookeeper 中,但我不知道这将如何与多个异步 processJob 一起运行。这是 Kafka 应该存储偏移量的地方:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

这应该包含一个值吗?如果那是真的,那将意味着我不能使用异步 processJob,因为不同进程之间可能存在延迟,并且它们会覆盖彼此的值。消费者是否应该在单个线程中运行并一次处理单个事件?启动更多消费者以加快速度而不是走 goroutines 路线的正确方法是什么?

最佳答案

我怀疑最简单的答案是不为信号量使用 channel 。而是使用一个由锁保护的整数,然后您可以在不重新启动的情况下调整最大可用 goroutines。

如果你真的想为此继续使用一个 channel ,你可以使用我的 channel 包中的 ResizableChannel:https://godoc.org/github.com/eapache/channels#ResizableChannel

关于asynchronous - 具有异步 goroutines 的 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22239447/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com