gpt4 book ai didi

java - 如何在 1 周后准确处理来自分布式日志代理(例如 Kafka)的日志?

转载 作者:行者123 更新时间:2023-12-01 11:17:46 25 4
gpt4 key购买 nike

如果我想处理来自 Kafka 的正好 1 周前的日志,可以进行什么设置?
用例是我维护过去 1 周用户 Activity 的累积统计信息。我对最终的一致性很好,不需要统计数据恰好是 1 周。
我有一个流设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的 Activity 都应从统计数据中删除。我可以实现的方法之一是使用批处理(例如 Spark)从统计数据中删除超过 1 周的 Activity 。
有什么方法可以使用流处理从统计数据中删除超过 1 周的用户 Activity ?各种方法的优缺点是什么?
如果我在 Kafka 中至少使用一次并且统计数据偏离了基本事实,那么有什么方法可以定期更正统计数据?

最佳答案

如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
documentation说:

Look up the offsets for the given partitions by timestamp. Thereturned offset for each partition is the earliest offset whosetimestamp is greater than or equal to the given timestamp in thecorresponding partition.


获取主题分区列表,可以调用 consumer.assignment() (在 subscribe()assign() 之后)返回 Set<TopicPartition>分配给消费者。 Long map 中的值基本上是时间戳。因此,对于您的所有键,它将是相同的值(即 1 周前的时间戳)
现在,你有一个 Map<TopicPartition, OffsetAndTimestamp> .您现在可以使用 seek(TopicPartition partition, long offset)寻找每个偏移量。
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
现在,您的消费者将位于一周前的消息的位置。所以,当你 poll() ,你从上周到现在的投票。
您可以更改时间戳以满足您的要求,例如,任何超过 1 周的时间都意味着,从时间戳 0 到上周时间戳。
所有前一周数据均值, 2weekOldTimestamp - 1weekOldTimestamp .
因此,在这种情况下,您必须寻求 2weekOldTimestamp然后处理每个分区,直到遇到 1weekOldTimestamp

关于java - 如何在 1 周后准确处理来自分布式日志代理(例如 Kafka)的日志?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62682194/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com