gpt4 book ai didi

apache-spark - 结构化流写入多个流

转载 作者:行者123 更新时间:2023-12-04 13:18:27 24 4
gpt4 key购买 nike

我的场景

  1. 从流中获取数据并调用返回 json 字符串的 UDF。 JSON 字符串中的属性之一是 UniqueId,UDF 将其生成为 guid.newGuid() (C#)。
    1. UDF 的 DataFrame 输出基于一些过滤器写入多个流/接收器。

问题:

  1. 每个接收器都为 UDF 生成的 UniqueId 获取一个新值。我如何为所有接收器保持相同的 UniqueId。
  2. 如果每个接收器获得不同的 UniqueId 值,是否意味着我的 UDF 被每个接收器多次调用?
  3. 如果 UDF 被调用两次,有什么选择可以让它被调用一次,然后将相同的数据写入不同的接收器
inData = spark.readstream().format("eventhub")

udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

filter1 = udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'")

# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()

# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()

最佳答案

每次调用 .writestream()....start() 时,您都在创建一个新的独立流式查询。

这意味着对于您定义的每个输出接收器,Spark 将从输入源再次读取并处理数据帧。

如果您只想读取和处理一次,然后输出到多个接收器,您可以使用 foreachBatch 接收器作为解决方法:

inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

udfdata.writeStream().foreachBatch(filter_and_output).start()
def filter_and_output(udfdata, batchId):
# At this point udfdata is a batch dataframe, no more a streaming dataframe
udfdata.cache()
filter1 = udfdata.filter("column =='filter1'")
filter2 = udfdata.filter("column =='filter2'")

# write filter1
filter1.write().format(delta).save(table1)
filter1.write().format(eventhub).save()

# write filter2
filter2.write().format(delta).save(table2)
filter2.write().format(eventhub).save()

udfdata.unpersist()

您可以在 Spark Structured Streaming documentation 中了解有关 foreachBatch 的更多信息.

回答你的问题

  1. 如果您使用 foreachBatch,您的数据将只被处理一次,并且所有接收器都将拥有相同的 UniqueId
  2. 使用 foreachBatch 将解决问题

关于apache-spark - 结构化流写入多个流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57367226/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com