gpt4 book ai didi

python - Python 函数上的 Spark UDF

转载 作者:行者123 更新时间:2023-12-01 00:19:51 26 4
gpt4 key购买 nike

我创建了一个 python 函数,用于使用 GCP Translate API 翻译短字符串。这些代码执行类似的操作。

def translateString(inputString, targetLanguage, apiKey):
baseUrl = "https://translation.googleapis.com/language/translate/v2?key="
q = "&q="
gcpKey = apiKey
target = "&target="
sentence = str(inputString)

#Finialize request url
url = baseUrl + gcpKey + q + sentence + target

#SEND REQUEST WITH EXPONENTIAL BACK OFF IN CASE OF ERRORS OF EXCEEDING QUOTA LIMITATIONS API
session = requests.Session()
retry = Retry(connect=3, backoff_factor=100)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
respons = session.get(url, timeout=120)

if respons.status_code == 200:
data = respons.json()
translatedStr = data["data"]["translations"][0]["translatedText"]
returnString = str(translatedStr)
return(returnString)

else:
return("Error with code: " + str(respons.status_code))

udfTrans = F.udf(translateString, StringType())

apiKey = *********

dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey)))

当循环遍历 pd.DataFrame 并存储返回变量时,这非常有用!但现在我需要在 spark.DataFrame 上应用此函数,以便可以分发工作并创建了以下 udfTrans = F.udf(translateString, StringType())以便它可以应用于 spark.DataFrame 中的 string 列。

当我运行 UDF 时dfTempNo = dfToProcess.withColumn("TRANSLATED_FIELD", udfTrans(lit(dfToProcess.FIELD_TO_PROCESS), lit("no"), lit(apiKey)))它不会返回任何错误,但需要永远在超过 1 行的 dfToProcess 上运行。

我不确定我是否误解了 UDF 如何应用于 spark.DataFrame 中的列。是否有可能使用 UDF 将这样的函数应用于 spark.DataFrame 还是我在 Python/Pandas 中这样做会更好?

最佳答案

Python udf 无法像这样并行化,因为您的 executor 需要回调 driver 来执行您的 >udf。不幸的是,这意味着您的 udf 将阻塞每一行,并且在执行过程中本质上是串行的。

可以使用不同的方法更有效地解决这个问题。由于您的函数严重受 IO 限制(更具体地说是网络限制),因此您可以查看诸如 ThreadPool 实现之类的内容,将输出存储在 Dict 中,然后调用 SparkContext .parallelize() 在您的 Dict 上并从那里开始。

或者,您可以在 scala 中编写 udf,因为它将自动并行执行。

或者,看看 https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf因为 pandas udf 可以矢量化。希望这有帮助!

关于python - Python 函数上的 Spark UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59030545/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com