gpt4 book ai didi

apache-spark - 当系列到系列(PandasUDFType.SCALAR)可用时,为什么系列迭代器到系列 pandasUDF(PandasUDFType.SCALAR_ITER)的迭代器?

转载 作者:行者123 更新时间:2023-12-05 05:50:10 25 4
gpt4 key购买 nike

根据函数的输入和输出类型,有不同种类的 pandasUDFType。

有:

系列到系列 PandasUDFType.SCALAR:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

还有系列迭代器到系列迭代器PandasUDFType.SCALAR_ITER:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
return map(lambda s: s + 1, iterator)

spark.range(10).select(pandas_plus_one("id")).show()

能否请您给我一个简单的用例,它不能通过系列到系列 PandasUDFType.SCALAR 解决,而可以通过 Iterator of Series 到 Iterator of Series PandasUDFType.SCALAR_ITER 解决>。我似乎无法理解在另一个仍然存在的情况下拥有一个的必要性

最佳答案

据官方documentationDatabricks docs ,这两种类型的 Pandas UDF 非常相似,但在某些方面有所不同。除了输入和输出类型的差异,Iterator of Series to Iterator of Series UDF 只能接受单个列作为输入,而 Scalar UDF 可以接受多个输入列。要使 Iterator UDF 采用多个 spark 列,您需要使用 Iterator of multiple Series to Iterator of Series UDF这与 Series 的迭代器到 UDF 的迭代器 基本相同,但采用 p.Series元组 的迭代器作为参数。

迭代器 UDF are said useful时间:

  • 您需要预取输入迭代器
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
threading.Thread(consume, args=(iterator, q)) # prefetch the iterator
for s in q:
yield func(s)
  • 在处理每个批处理之前,您需要进行一些昂贵的状态初始化:
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
s = some_initialization() # initialize states
for x in iterator:
yield func(x, s) # use the state for the whole iterator

但是,文档中的引用引起了一些混淆,因为它指出在内部它与 Series 到 Series 的工作方式相同:

It is also useful when the UDF execution requires initializing somestates although internally it works identically as Series to Seriescase

关于apache-spark - 当系列到系列(PandasUDFType.SCALAR)可用时,为什么系列迭代器到系列 pandasUDF(PandasUDFType.SCALAR_ITER)的迭代器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70553903/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com