gpt4 book ai didi

python - 从 Kafka 轮询几条消息

转载 作者:太空宇宙 更新时间:2023-11-04 04:15:01 25 4
gpt4 key购买 nike

我正在使用 confluent_kafka与 Kafka 一起工作的软件包。我以这种方式创建主题:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

def my_producer():
bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']

value_schema = avro.load('/home/ValueSchema.avsc')

avroProducer = AvroProducer({
'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'schema.registry.url':'http://my_adress.com:8081',
},
default_value_schema=value_schema
)

for i in range(0, 25000):
value = {"name":"Yuva","favorite_number":10,"favorite_color":"green","age":i*2}
avroProducer.produce(topic='my_topik14', value=value)
avroProducer.flush(0)
print('Finished!')


if __name__ == '__main__':
my_producer()

它有效。 (顺便说一下,这会收到 24820 条消息,而不是 25000 条……)我们可以检查一下:

kafka-run-class kafka.tools.GetOffsetShell --broker-list my_adress.com:9092 --topic my_topik14
my_topik14:0:24819

现在我要消费:

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

bootstrap_servers=['my_adress.com:9092',
'my_adress.com:9092']
c = AvroConsumer(
{'bootstrap.servers': bootstrap_servers[0]+','+bootstrap_servers[1],
'group.id': 'avroneversleeps',
'schema.registry.url': 'http://my_adress.com:8081',
'api.version.request': True,
'fetch.min.bytes': 100000,
'consume.callback.max.messages':1000,
'batch.num.messages':2
})
c.subscribe(['my_topik14'])
running = True

while running:
msg = None
try:
msg = c.poll(0.1)
if msg:
if not msg.error():
print(msg.value())
c.commit(msg)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
print("No Message!! Happily trying again!!")
except SerializerError as e:
print("Message deserialization failed for %s: %s" % (msg, e))
running = False
c.commit()
c.close()

但是有个问题:我只是一条一条地阅读消息。我的问题是如何阅读批量消息?我在 Consumer 配置中尝试了不同的参数,但它们没有任何反应!


我还找到了this question在 SO 上并尝试了相同的参数 - 它仍然不起作用。

另请阅读此内容。但这违背了之前的链接...

最佳答案

关于python - 从 Kafka 轮询几条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55618541/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com