gpt4 book ai didi

scala - 如何节流 Spark Streaming?

转载 作者:行者123 更新时间:2023-12-05 03:09:24 24 4
gpt4 key购买 nike

这个问题有点偏离我关于管理 AmazonDynamoDbClient 节流和重试的其他问题。但是,我认为解决方案可能在我进行发电机调用之前就已经存在。

我的高级流程如下:我有一个 Scala 应用程序,它利用 Apache Spark 读取大型 CSV 文件并对它们执行一些聚合,然后将它们写入 dynamo。我将其部署到 EMR 以提供可扩展性。问题是,一旦聚合完成,我们就有数百万条记录准备好进入 dynamo,但我们有 dynamo 的写入能力。它们不需要立即插入,但最好控制每秒插入的数量,以便我们可以针对我们的用例对其进行微调。

这是我目前所拥有的代码示例:

val foreach = new ForeachWriter[Row] {
override def process(value: Row): Unit = {
//write to dynamo here
}

override def close(errorOrNull: Throwable): Unit = {
}

override def open(partitionId: Long, version: Long): Boolean = {
true
}
}

val query = dataGrouped
.writeStream
.queryName("DynamoOutput")
.format("console")
.foreach(foreach)
.outputMode(OutputMode.Complete())
.start()
.awaitTermination()

有没有人对如何解决这个问题有任何建议?

最佳答案

您应该查看 spark.streaming.backpressure.enabled 配置。来自documentation :

设置最大接收速率 - 如果集群资源不足以让流式处理应用程序以接收数据的速度处理数据,则可以通过设置最大速率限制来限制接收者的速率记录/秒请参阅接收器的配置参数 spark.streaming.receiver.maxRate 和直接 Kafka 方法的 spark.streaming.kafka.maxRatePerPartition。在 Spark 1.5 中,我们引入了一个称为背压的特性,无需设置此速率限制,因为 Spark Streaming 会自动计算出速率限制,并在处理条件发生变化时动态调整它们。可以通过将配置参数 spark.streaming.backpressure.enabled 设置为 true 来启用此背压。

关于scala - 如何节流 Spark Streaming?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43076109/

24 4 0