gpt4 book ai didi

faust - 如何将 Faust 中的消费者设置为特定的偏移量

转载 作者:行者123 更新时间:2023-12-04 04:17:39 26 4
gpt4 key购买 nike

从 Faust 文档中我无法找到如何将消费者设置为特定的偏移量。

对于 confluent-kafka,我使用 consumer.offsets_for_times 来查找 start_offset,然后将 TopicPartition 分配给该特定偏移量,例如:

start_offset = consumer.offsets_for_times([
TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])

consumer.assign([
TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])

对于浮士德,我找不到更多的东西:

consumer_auto_offset_reset

只允许您设置最早或最晚。我如何从特定的时间或一天的开始开始阅读?

最佳答案

要将偏移量设置为特定值,您可以使用这些示例。在这里,我将偏移量设置为 50000。每次启动我的应用程序时,代理都会从偏移量 50000 处开始读取。为此,我使用 app.consumer.seek

这里 tp 有两个参数,topic - 在本例中为 test,0 是分区号。更多信息 faust.types

from faust.types import TP, Message

tp = TP("test", 0)
topic = app.topic(tp.topic)

@app.task()
async def on_start():
await app.consumer.seek(tp, 50000)
print("App startet")

@app.agent(topic)
async def receive(stream):
async for event in stream.events():
print((event.message.offset, event.value))

关于faust - 如何将 Faust 中的消费者设置为特定的偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60267358/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com