作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如果我想处理来自 Kafka 的正好 1 周前的日志,可以进行什么设置?
用例是我维护过去 1 周用户 Activity 的累积统计信息。我对最终的一致性很好,不需要统计数据恰好是 1 周。
我有一个流设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的 Activity 都应从统计数据中删除。我可以实现的方法之一是使用批处理(例如 Spark)从统计数据中删除超过 1 周的 Activity 。
有什么方法可以使用流处理从统计数据中删除超过 1 周的用户 Activity ?各种方法的优缺点是什么?
如果我在 Kafka 中至少使用一次并且统计数据偏离了基本事实,那么有什么方法可以定期更正统计数据?
最佳答案
如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
documentation说:
Look up the offsets for the given partitions by timestamp. Thereturned offset for each partition is the earliest offset whosetimestamp is greater than or equal to the given timestamp in thecorresponding partition.
consumer.assignment()
(在
subscribe()
或
assign()
之后)返回
Set<TopicPartition>
分配给消费者。
Long
map 中的值基本上是时间戳。因此,对于您的所有键,它将是相同的值(即 1 周前的时间戳)
Map<TopicPartition, OffsetAndTimestamp>
.您现在可以使用
seek(TopicPartition partition, long offset)
寻找每个偏移量。
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
现在,您的消费者将位于一周前的消息的位置。所以,当你
poll()
,你从上周到现在的投票。
2weekOldTimestamp - 1weekOldTimestamp
.
2weekOldTimestamp
然后处理每个分区,直到遇到
1weekOldTimestamp
关于java - 如何在 1 周后准确处理来自分布式日志代理(例如 Kafka)的日志?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62682194/
我是一名优秀的程序员,十分优秀!