gpt4 book ai didi

python - 使用结构化流(PySpark)运行链式查询

转载 作者:太空宇宙 更新时间:2023-11-03 14:03:45 24 4
gpt4 key购买 nike

我的代码是这样的

df = spark.readStream.option("header","true") \
.schema(df_schema)\
.csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

我收到错误消息使用流源的查询必须在 dfc 行使用 writeStream.start() 执行,但我不确定我在做什么错误的。 Spark结构化流不支持这样的链式查询吗?据我所知,我没有做任何分支。

编辑:

通过从 dfc 行中删除 count(),我收到了一个新错误 StreamingQueryException: Exception returned in awaitResult,该错误是由 >query.awaitTermination() 调用。知道为什么 count() 不起作用以及为什么出现新错误吗?

编辑2:

如果我直接登录到控制台而不运行 df 之后的所有中间查询,它就可以工作。但是,每次我尝试运行其他查询时,都会引发 StreamingQueryException

最佳答案

由于 structured streaming 的性质不可能以与静态数据帧相同的方式获得计数。创建流时,Spark 使用 trigger 轮询源以获得新数据。如果有任何 Spark,会将其拆分为小型 DataFrame(微批处理)并沿流传递(转换、聚合、输出)。

如果您需要获取记录数,您可以添加 listener to get progress updates并在 onQueryProgress(QueryProgressEvent event) 中获取输入数量。

很难说为什么会出现 StreamingQueryException,因为 filter()withColumn() 在结构化流中正常工作。您是否在控制台中看到了可能导致 awaitResult 中引发异常 的其他错误?

顺便说一句,如果单个 session 中有多个流,则应使用 spark.streams.awaitAnyTermination() 进行阻塞,直到其中任何一个流终止。

以下查询应该可以正常工作:

query = spark.readStream
.option("header","true") \
.schema(df_schema)\
.csv(df_file)\
.filter(df.col == 1)\
.withColumn("new_col", udf_f(df2.some_col))\
.writeStream\
.format("console")\
.outputMode("append")\
.start()

query.awaitTermination()
# or spark.streams().awaitAnyTermination()

关于python - 使用结构化流(PySpark)运行链式查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49064338/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com