gpt4 book ai didi

python - Kafka python 优雅关闭消费者

转载 作者:行者123 更新时间:2023-12-04 11:33:47 38 4
gpt4 key购买 nike

我正在尝试正常关闭 kafka 使用者,但脚本会因停止心跳线程而阻塞。
如何使用 kafka-python 在 SIGTERM 上优雅地关闭使用者。
这就是我所做的

import logger as logging
import time
import sys
from kafka import KafkaConsumer
import numpy as np
import signal


log = logging.getLogger(__name__)


class Cons:
def __init__(self):

signal.signal(signal.SIGINT, self.sigterm_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])

def sigterm_handler(self, signum, frame):
log.info("Sigterm handler")
self.consumer.close(autocommit=False)
sys.exit(0)



def consume(self):
try:
while True:
records = self.consumer.poll(timeout_ms=500, max_records=500)
for topic_partition, consumer_records in records.items():
for record in consumer_records:
log.info("Got Record - {}".format(record))
#code to manually commit


except ValueError as e:
log.exception("exception")


if __name__ == '__main__':
c=Cons()
c.consume()

启用调试日志后,这是我得到的输出,代码因此被阻止。
^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread

这背后的原因是什么?在 SIGTERM 或 SIGINT 上关闭消费者的正确方法是什么?

最佳答案

看起来这最终在 2020 年得到修复,因此任何最新版本都应该没问题:
https://github.com/dpkp/kafka-python/commit/91daea329bb40ed80bddef4635770d24b670b0c6

关于python - Kafka python 优雅关闭消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61474757/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com