gpt4 book ai didi

Python Kafka消费者读取已读消息

转载 作者:行者123 更新时间:2023-12-02 19:25:23 27 4
gpt4 key购买 nike

Kafka 消费者代码 -

def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()

预期行为它应该只读取生产者写入的最后一条消息。它应该只打印:

file_data key=None value=b'data'

当前行为运行代码后打印:

file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'

最佳答案

from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer

def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, b'data')
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=1000,
group_id="Group2",
enable_auto_commit=False,
auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
# consumer.seek_to_beginning(topic_partition)
for message in consumer:
print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()
test()

这正在按照您的预期进行。如果您希望它从头开始,则只需调用 seekToBeginning

引用号:seek_to_beginning

关于Python Kafka消费者读取已读消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62482984/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com