gpt4 book ai didi

apache-spark - 如何在一个微批中设置最大行数?

转载 作者:IT王子 更新时间:2023-10-29 06:03:48 26 4
gpt4 key购买 nike

我正在通过以下代码使用 spark-structured-streaming foreachBatch 从 Redis 读取批记录(尝试通过 stream.read.batch.size 设置 batchSize)

val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}

目前我们将 stream.read.batch.size 设置为 128,但这似乎不起作用。 batchSize 似乎是随机的,有时超过 1000 甚至 10000。

但是我不想等这么久(10000条记录)因为我有一些操作(在代码注释//一些操作)需要尽快完成,这样我想控制最大批大小以便当记录达到此限制时可以立即处理,怎么办?

最佳答案

我是 spark-redis 的维护者。目前不支持此功能。 stream.read.batch.size 参数控制单个 Redis API 调用读取的项目数(XREADGROUP 调用的 count 参数)。它不影响每个触发器的项目数(batchDF 大小)。我已经在 github 上为这个功能请求开了一张票。

关于apache-spark - 如何在一个微批中设置最大行数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56679474/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com