gpt4 book ai didi

python - Kafka python消费者组 session 超时

转载 作者:行者123 更新时间:2023-12-04 04:12:40 34 4
gpt4 key购买 nike

我使用的是 confluent-kafka v1.3.0,我有以下消费者组 session 超时问题。我的配置如下:

c['KAFKA'] = {
'bootstrap.servers': 'host.docker.internal:9104',
'consumer': {
'group.id': 'consumer',
'enable.auto.commit': True,
'default.topic.config': {
'auto.offset.reset': 'earliest
},
'heartbeat.interval.ms': 100000,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 100000
},
}

代码中的逻辑如下:

consumer.subscribe('database_changes')

with ThreadPoolExecutor(max_workers=500) as executor:
while True:
msg = consumer.poll(100)
if msg is not None:
executor.submit(process_message, msg)

函数处理消息中的代码等待几毫秒,因为它的逻辑非常简单。一切正常,但我每时每刻都收到此错误:

{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}

这些重新平衡极大地阻碍了整个过程。

有没有人知道什么会被错误设置?我怀疑心跳不起作用,但我不知道它如何验证或更好地修复。

谢谢

最佳答案

根据 confluent kafka 文档,heartbeat.interval.ms 应设置为不高于 session.timeout.ms 的 1/3。由于在 sessions.timeout.ms 值之后,消费者被认为已经死亡,建议至少等待三个心跳来假设。

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat.interval.ms

关于python - Kafka python消费者组 session 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61455053/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com