gpt4 book ai didi

python - 卡夫卡 : How to consume data based on Timestamp

转载 作者:行者123 更新时间:2023-12-01 01:54:11 26 4
gpt4 key购买 nike

我想知道除了偏移量之外是否还有其他方法可以获取相对于时间间隔的数据?比如说,我想消耗掉昨天的所有日期,我该怎么做?

最佳答案

使用offsetsForTimes获得与所需时间戳相关的正确偏移量。在 Python 中,它将如下所示:

from datetime import datetime
from kafka import KafkaConsumer, TopicPartition

topic = "www.kilskil.com"
broker = "localhost:9092"

# lets check messages of the first day in New Year
date_in = datetime(2019,1,1)
date_out = datetime(2019,1,2)

consumer = KafkaConsumer(topic, bootstrap_servers=broker, enable_auto_commit=True)
consumer.poll() # we need to read message or call dumb poll before seeking the right position

tp = TopicPartition(topic, 0) # partition n. 0
# in simple case without any special kafka configuration there is only one partition for each topic channel
# and it's number is 0

# in fact you asked about how to use 2 methods: offsets_for_times() and seek()
rec_in = consumer.offsets_for_times({tp:date_in.timestamp() * 1000})
rec_out = consumer.offsets_for_times({tp:date_out.timestamp() * 1000})

consumer.seek(tp, rec_in[tp].offset) # lets go to the first message in New Year!

c = 0
for msg in consumer:
if msg.offset >= rec_out[tp].offset:
break

c += 1
# message also has .timestamp field

print("{c} messages between {_in} and {_out}".format(c=c, _in=str(date_in), _out=str(date_out)))

不要忘记 Kafka 以毫秒为单位测量时间戳,并且它具有 long 类型。 Python lib datetime 返回时间戳(以秒为单位),因此我们需要将其乘以 1000。方法 offsets_for_times 返回一个带有 TopicPartition 键和 OffsetAndTimestamp 值的字典。

关于python - 卡夫卡 : How to consume data based on Timestamp,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50405509/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com