gpt4 book ai didi

python - KeyError:kafka.生产者.record_accumulator.RecordBatch

转载 作者:行者123 更新时间:2023-11-30 22:28:14 25 4
gpt4 key购买 nike

使用 kafka-python api 向某个主题发送一堆消息。部分消息已成功发送到主题,但在程序终止之前并未发送全部消息,并显示以下错误消息:

KeyError: <kafka.producer.record_accumulator.RecordBatch object at 0x143d290>
Batch is already closed -- ignoring batch.done()
Error processing errback
Traceback (most recent call last):
File "/usr/lib/python2.6/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 185, in _failed_produce
self._complete_batch(batch, error, -1, None)
File "/usr/lib/python2.6/site-packages/kafka/producer/sender.py", line 243, in _complete_batch
self._accumulator.deallocate(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 507, in deallocate
self._incomplete.remove(batch)
File "/usr/lib/python2.6/site-packages/kafka/producer/record_accumulator.py", line 587, in remove
return self._incomplete.remove(batch)

每次运行我的主题中实际上都会收到不同数量的消息。问题似乎是 kafka Producer.send 调用在程序结束之前未完成发送。

根据kafka文档, Producer.send 是一个异步方法,这可能是根本原因 - 并非所有异步线程在进程被终止之前完成发送:

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

对此有许多简单的解决方案(例如将 batch.size 设置为较小的数字),这可能会导致性能瓶颈。

您将如何解决这个问题而又不会过多影响性能

最佳答案

退出前只需调用 Producer.flush() 即可。

关于python - KeyError:kafka.生产者.record_accumulator.RecordBatch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46672837/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com