gpt4 book ai didi

apache-spark - 默认(未指定)触发器如何确定结构化流中微批处理的大小?

转载 作者:行者123 更新时间:2023-12-05 05:09:21 25 4
gpt4 key购买 nike

当查询执行In Spark Structured Streaming没有设置trigger时,

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
//.trigger(???) // <--- Trigger intentionally omitted ----
.start()

截至 Spark 2.4.3(2019 年 8 月)。 Structured Streaming Programming Guide - Triggers

If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

问题:默认触发器根据什么决定微批处理的大小?

说吧。输入源是 Kafka。由于一些中断,工作中断了一天。然后重新启动同一个 Spark 作业。然后它将在它停止的地方使用消息。这是否意味着第一个微批处理将是一个巨大的批处理,其中有 1 天的消息在作业停止时累积在 Kafka 主题中?假设作业需要 10 小时来处理那个大批量,那么下一个微批处理有 10 小时的消息?并逐渐直到 X 次迭代以 catch 积压以达到更小的微批处理。

最佳答案

On which basis the default trigger determines the size of the micro-batches?

事实并非如此。每个触发器(无论多长)都只是请求输入数据集的所有来源,并且它们提供的任何内容都由运算符(operator)在下游处理。消息来源知道应该提供什么,因为他们知道到目前为止已经消费(处理)了什么。

就好像您询问了批处理结构化查询以及此单个“触发器”请求处理的数据大小(顺便说一句,有 ProcessingTime.Once 触发器)。

Does that mean the first micro-batch will be a gigantic batch with 1 day of msg which accumulated in the Kafka topic while the job was stopped?

几乎(与​​ Spark Structured Streaming 几乎没有任何关系)。

底层 Kafka 消费者获取处理的记录数由 max.poll.records 和可能由一些其他配置属性配置(参见 Increase the number of messages read by a Kafka consumer in a single poll )。

由于 Spark Structured Streaming 使用 Kafka 数据源,它只是 Kafka Consumer API 的包装器,因此单个微批处理中发生的任何事情都等同于此单个 Consumer.poll 调用。

您可以使用带有 kafka. 前缀的选项(例如 kafka.bootstrap.servers)来配置底层 Kafka 消费者,这些选项被认为是驱动程序和执行程序上的 Kafka 消费者.

关于apache-spark - 默认(未指定)触发器如何确定结构化流中微批处理的大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57612213/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com