gpt4 book ai didi

java - 如何从 Java 中 Kafka 的简单消费者返回的 ByteBufferMessageSet 中获取 OffSet?

转载 作者:行者123 更新时间:2023-11-29 05:40:27 25 4
gpt4 key购买 nike

考虑以下代码:

public static long Offset = 0L;
FetchRequest req = new FetchRequest(KafkaProperties.topic, 0, Offset,10485760);
ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);

问题是如何获取最后的偏移量并设置回变量Offset以从Kafka读取下一批数据?


更新:当我打印数据时,即:

for (MessageAndOffset messageAndOffset : messageSet) { 
System.out.println(messageAndOffset);
}

输出如下:

MessageAndOffset(message(magic = 1, attributes = 0, crc = 2000130375, payload = java.nio.HeapByteBuffer[pos=0 lim=176 cap=176]),296215)
MessageAndOffset(message(magic = 1, attributes = 0, crc = 956398356, payload = java.nio.HeapByteBuffer[pos=0 lim=196 cap=196]),298144)
....
....
MessageAndOffset(message(magic = 1, attributes = 0, crc = 396743887, payload = java.nio.HeapByteBuffer[pos=0 lim=179 cap=179]),299136)

docs说最后一个数字是偏移量

MessageAndOffset(message: Message, offset: Long)

在上述情况下,我最后一次读取的偏移量将是 299136

最佳答案

这样的事情有帮助吗?这样做的一个坏处是它将永远循环。

    long offset = 0;

while (true) {
FetchRequest fetchrequest = new FetchRequest(topicName, 0, offset, 10485760);

ByteBufferMessageSet messages = consumer.fetch(fetchrequest);
for (MessageAndOffset msg : messages) {
System.out.println("consumed: " + Utils.toString(msg.message().payload(), "UTF-8"));
offset = msg.offset();
}

}

同样在 0.8 Kafka SimpleConsumer example , 他们有类似下面的东西

    long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}

但他们还提到应用程序需要 a_maxread (要读取的最大消息数)参数作为参数传递,因此我们不会永远循环。我是 kafka 的新手,不确定这是否是您要找的。

关于java - 如何从 Java 中 Kafka 的简单消费者返回的 ByteBufferMessageSet 中获取 OffSet?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17808826/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com