gpt4 book ai didi

apache-spark - 将附加参数传递给 pyspark 中的 foreachBatch

转载 作者:行者123 更新时间:2023-12-03 16:51:08 28 4
gpt4 key购买 nike

我在 pyspark 结构化流中使用 foreachBatch 使用 JDBC 将每个微批处理写入 SQL Server。我需要对多个表使用相同的过程,并且我想通过为表名添加一个额外的参数来重用相同的编写器函数,但我不确定如何传递表名参数。

示例 here非常有帮助,但在 python 示例中,表名是硬编码的,看起来在 scala 示例中他们引用了一个全局变量(?)我想将表名传递给函数。

上面链接的python示例中给出的函数是:

def writeToSQLWarehose(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

我想使用这样的东西:
def writeToSQLWarehose(df, epochId, tableName):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", tableName) \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

但我不确定如何通过 foreachBatch 传递附加参数。

最佳答案

像这样的事情应该有效。

streamingDF.writeStream.foreachBatch(lambda df,epochId: writeToSQLWarehose(df, epochId,tableName )).start()

关于apache-spark - 将附加参数传递给 pyspark 中的 foreachBatch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55973631/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com