gpt4 book ai didi

pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

转载 作者:行者123 更新时间:2023-12-02 18:07:17 26 4
gpt4 key购买 nike

我想限制从kafka获取数据时的速率。我的代码如下所示:

df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()

但是当我调用df.count()时,结果是 600。我期望的是 20。有谁知道为什么“maxOffsetsPerTrigger”不起作用。

最佳答案

每个分区 (0, 1, 2) 包含 200 条记录,总数为 600 条记录。

正如您在这里看到的:

Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.

这意味着对于每个触发器或获取过程,Kafka 将获取 20 条记录,但总的来说,您仍将获取配置中设置的总记录(每个分区 200 条)。

关于pyspark - 如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51033334/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com