gpt4 book ai didi

python - Kafka生产者不选择新分区

转载 作者:行者123 更新时间:2023-11-28 19:08:14 27 4
gpt4 key购买 nike

我是 Kafka 的新手,正在尝试在其上构建一个服务消息传递平台。这是我的设置:

卡夫卡 0.9.0.1
动物园管理员 3.4.8
kafka-python 1.3.3

我的应用程序创建了一个 KafkaProducer,我从中向具有 6 个分区的单个主题发送消息流。我还创建了 7 个 KafkaConsumer(在单个 group_id 下),其中 6 个分配给 6 个分区,一个处于空闲状态(这是预期的)。虽然生产者正在流式传输,我将分区数增加到 7,期望流不会分布在 7 个分区中并唤醒空闲的消费者。但是,生产者似乎没有拿起新添加的分区直到我通过重新启动应用程序重新初始化它。我通过运行以下命令来缩放分区数:

kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7

有没有办法让生产者在不重新初始化的情况下获取分区计数的变化?

这是相关的代码片段:

制作人

class Producer(threading.Thread):
daemon = True

def __init__(self, name, manager):
super(Producer, self).__init__()
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')

def run(self):
while not self.killed:
if not self.q.empty():
self._busy()
self.producer.send('test', value=self.q.get())
else:
self._free()

消费者

class Consumer(threading.Thread):
daemon = True

def __init__(self, name, manager):
super(Consumer, self).__init__()
self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
group_id='test_group',
client_id="Consumer " + self.name)
self.consumer.subscribe(['test'])

def run(self):
while not self.killed:
messages = self.consumer.poll()

for topic, records in messages.iteritems():
print self.consumer.config['client_id'] + ": " + str(records)

最佳答案

我遇到了一个可能类似的问题并找到了解决方案。我写在这里: How does librdkafka producer learn about new topic partitions in Kafka

如果您的测试时间太短,这可能是生产者没有了解新分区的原因。参数 topic.metadata.refresh.interval.ms 默认为 300000(以毫秒为单位),因此代理将每 5 分钟刷新生产者中的元数据。如果您的测试在添加分区后花费了 5 分钟以上,则这不是原因。

关于python - Kafka生产者不选择新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44057119/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com