gpt4 book ai didi

apache-kafka - 在Kafka中如何根据生产时间获得确切的偏移量

转载 作者:行者123 更新时间:2023-12-04 00:21:39 26 4
gpt4 key购买 nike

我需要一天一小时地获取 Kafka 中生成的消息。每隔一小时,我将启动一项工作来消费 1 小时前生成的消息。例如,如果当前时间是 20:12,我将在 19:00:00 和 19:59:59 之间消费消息。这意味着我需要在 19:00:00 时间开始偏移并在 19:59:59 时间结束偏移。我使用 SimpleConsumer.getOffsetsBefore 如「 0.8.0 SimpleConsumer Example 所示」。问题是返回的偏移量与作为参数给出的时间戳不匹配。例如当时间戳为 19:00:00 时,我收到了在 16:38:00 时间生成的消息。

最佳答案

下面kafka消费者api方法getOffsetsByTimes()可以用于此,它从 0.10.0 版本或更高版本可用。见 JavaDoc .

/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
*
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
*
* Notice that this method may block indefinitely if the partition does not exist.
*
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}

关于apache-kafka - 在Kafka中如何根据生产时间获得确切的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22558933/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com