gpt4 book ai didi

azure - 您似乎正在尝试从广播中引用 SparkContext

转载 作者:行者123 更新时间:2023-12-02 06:59:03 27 4
gpt4 key购买 nike

我正在尝试使用 Spark 结构化流处理一些事件。

传入事件如下所示:

事件1:

<表类=“s-表”><标题>网址 <正文>http://first/path/to/read/from...

事件2:

<表类=“s-表”><标题>网址 <正文>http://second/path/to/read/from...

等等。

我的目标是读取每个 URL 并生成一个新的 DF。到目前为止,我已经用这样的代码完成了,我做了 collect() .


def createDF(url):

file_url = "abfss://" + container + "@" + az_storage_account + ".dfs.core.windows.net/" + az_storage_folder + "/" + url

""" Read data """
binary = spark.read.format("binaryFile").load(file_url)
""" Do other operations """
...

""" save the data """
# write it into blob again

return something

def loadData(batchDf, batchId):


"""
batchDf:
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
| body|partition| offset|sequenceNumber| enqueuedTime|publisher|partitionKey| properties|systemProperties| url|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
|[{"topic":"/subsc...| 0|30084343744| 55489|2021-03-03 14:21:...| null| null|[aeg-event-type -...| []|http://path...|
+--------------------+---------+-----------+--------------+--------------------+---------+------------+--------------------+----------------+--------------------+
"""

""" Before ....

df = batchDf.select("url")
url = df.collect()

[createDF(item) for item in url]
"""
# Now without collect()
# Select the url field of the df
url_select_df = batchDf.select("url")

# Read url value
result = url_select_df.rdd.map(lambda x: createDF(x.url))

query = df \
.writeStream \
.foreachBatch(loadData) \
.outputMode("update") \
.queryName("test") \
.start() \
.awaitTermination()

但是,当我想提取 URL 而不收集时,我收到以下错误消息:

您似乎正在尝试从广播引用 SparkContext

可能会发生什么?

非常感谢您的帮助

最佳答案

如果没有调用collect,数据帧url_select_df将分布在执行器之间。然后,当您调用 map 时,lambda 表达式将在执行器上执行。由于 lambda 表达式正在调用使用 SparkContext 的 createDF,因此您会收到异常,因为无法在执行器上使用 SparkContext。

看起来您已经找到了解决方案,即将数据帧收集到驱动程序并在那里应用 lambda 表达式。

只需确保您的驱动程序没有重载(基于内存)。

关于azure - 您似乎正在尝试从广播中引用 SparkContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66458872/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com