gpt4 book ai didi

python - 如何使用kafka.consumer.SimpleConsumer,seek()

转载 作者:行者123 更新时间:2023-11-28 19:17:30 37 4
gpt4 key购买 nike

API 文档在这里:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html

但是当我运行下面的代码时,异常是%d format: a number is required, not NoneType

    client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, whence=None)# (0,2) and (0,0)
run = True
while( run ):
message = consumer.get_message(block=False, timeout=4000)

except Exception as e:
print "Exception while trying to read msg:", str(e)

当我使用以下代码时,异常是seek() 得到了一个意外的关键字参数'partition'

consumer.seek(0, whence=None, partition=None)# (0,2) and (0,0)

有什么想法吗?谢谢。

最佳答案

在 Kafka 权威指南中,有一个用 Java 编写的 seek() 的示例代码(不是用 Python,但我希望你能理解一般的想法)。

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {

public void onPartitionsRevoked (Collection <TopicPartition> partitions) {
commitDBTransaction();
}

public void onPartitionsAssigned(Collection <TopicPartiton> partitions) {
for(TopicPartition partition : partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}

}
} // these brackets are exactly the same as the book. I didn't change anything. You might want to though.

consumer.subscribe (topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);

for ( TopicPartition partition : consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));

while (true) {
ConsumerRecords <String, String> records = consumer.poll(100);
for (ConsumerRecord <String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}

关于python - 如何使用kafka.consumer.SimpleConsumer,seek(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31841960/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com