gpt4 book ai didi

apache-spark - 如何在 Spark Structured Streaming 中指定批处理间隔?

转载 作者:行者123 更新时间:2023-12-03 20:27:36 24 4
gpt4 key购买 nike

我正在使用 Spark Structured Streaming 并遇到了一个问题。

在 StreamingContext、DStreams 中,我们可以定义批处理间隔如下:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval

如何在结构化流媒体中做到这一点?

我的流媒体是这样的:
sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()

stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")

sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start()

此代码按预期工作,但是,如何/在哪里定义批处理间隔?

我是结构化流媒体的新手,请指导我。

最佳答案

tl;博士 使用 trigger(...) (在 DataStreamWriter 上,即在 writeStream 之后)

这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html .
有多种选择,如果您不设置批处理间隔,Spark 会在处理完最后一个批处理后立即查找数据。触发器就是这里。
从手册:

The trigger settings of a streaming query defines the timing ofstreaming data processing, whether the query is going to executed asmicro-batch query with a fixed batch interval or as a continuousprocessing query.


一些例子:
默认触发器(尽快运行微批处理)
df.writeStream \
.format("console") \
.start()
ProcessingTime 触发器,微批次间隔为两秒
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
一次性触发
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
具有一秒检查点间隔的连续触发
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()

关于apache-spark - 如何在 Spark Structured Streaming 中指定批处理间隔?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57760563/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com