gpt4 book ai didi

apache-spark - 如何在单个 Spark 作业中调用多个 writeStream 操作?

转载 作者:行者123 更新时间:2023-12-01 23:54:13 27 4
gpt4 key购买 nike

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从 Kafka 主题读取并通过 writeStream 操作写入单独的路径(在执行一些转换之后)。但是,当我运行以下代码时,只有第一个 writeStream 被执行,第二个被忽略。

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()

write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()

// transform df to df2

write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()

我最初以为我的问题与此有关 post , 但是,在将我的代码更改为以下内容之后:

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()

write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()

// transform df to df2

write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()

write_one.awaitTermination()
write_two.awaitTermination()

我收到以下错误:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我不确定为什么 start()awaitTermination() 之间的附加代码会导致上面的错误(但我认为这可能是一个单独的问题在此 answer 中引用到上面的同一帖子)。在同一作业中调用多个 writeStream 操作的正确方法是什么?最好在由 foreachBatch 调用的函数中进行两次写入,还是有更好的方法来实现这一点?

最佳答案

Spark 文档说,如果您需要对多个位置执行写入操作,则需要使用 foreachBatch 方法。

您的代码应该类似于:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}

注意:需要坚持以防止重新计算。

您可以查看更多:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

关于apache-spark - 如何在单个 Spark 作业中调用多个 writeStream 操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63042583/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com