gpt4 book ai didi

python - 如何获取 kafka 主题分区的最新偏移量?

转载 作者:IT老高 更新时间:2023-10-28 20:56:29 24 4
gpt4 key购买 nike

我正在为 Kafka 使用 Python 高级消费者,并且想知道主题的每个分区的最新偏移量。但是我无法让它工作。

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

我有一个使用 assign 的替代方法,但结果是一样的

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

从一些文档看来,如果没有发出 fetch,我可能会得到这种行为。但我找不到强制执行的方法。我做错了什么?

或者是否有不同/更简单的方法来获取主题的最新偏移量?

最佳答案

在花了一天的时间和几个错误的开始之后,我终于找到了一个解决方案并让它发挥作用。发给她,让其他人可以引用。

from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload

client = SimpleClient(brokers)

partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]

offsets_responses = client.send_offset_request(offset_requests)

for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])

关于python - 如何获取 kafka 主题分区的最新偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35432326/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com