gpt4 book ai didi

python - 使用 Cython 时,可以替代在 PySpark mapPartitions 中使用嵌套函数吗?

转载 作者:行者123 更新时间:2023-12-01 07:52:48 35 4
gpt4 key购买 nike

我希望对数据框执行按行操作,该操作接受一些固定变量作为参数。我知道如何做到这一点的唯一方法是使用嵌套函数。我尝试使用 Cython 编译部分代码,然后从 mapPartitions 中调用 Cython 函数,但它引发了错误 PicklingError: Can't pickle <cyfunction outer_function.<locals>._nested_function at 0xfffffff> .

当使用纯Python时,我这样做

def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function

output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))

现在我有 outer_function在单独的文件中定义,如下所示

# outer_func.pyx

def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function

还有这个

# runner.py

from outer_func import outer_function

output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))

这会引发上面的酸洗错误。

我看过https://docs.databricks.com/user-guide/faq/cython.html并试图获取 outer_function 。尽管如此,还是会出现同样的错误。问题是嵌套函数没有出现在模块的全局空间中,因此无法找到并序列化它。

我也尝试过这样做

def outer_function(fixed_var_1, fixed_var_2):
global _nested_function
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function

这会引发不同的错误 AttributeError: 'module' object has no attribute '_nested_function' .

在这种情况下有什么办法不使用嵌套函数吗?或者是否有另一种方法可以使嵌套函数“可序列化”?

谢谢!

编辑:我也尝试过这样做

# outer_func.pyx

class PartitionFuncs:

def __init__(self, fixed_var_1, fixed_var_2):
self.fixed_var_1 = fixed_var_1
self.fixed_var_2 = fixed_var_2

def nested_func(self, partition):
for row in partition:
yield dosomething(row, self.fixed_var_1, self.fixed_var_2)
# main.py

from outer_func import PartitionFuncs

p_funcs = PartitionFuncs(a, b)
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(p_funcs.nested_func)

我仍然得到 PicklingError: Can't pickle <cyfunction PartitionFuncs.nested_func at 0xfffffff> 。哦,好吧,这个想法行不通。

最佳答案

这是一个一半的答案,因为当我尝试你的 class PartitionFuncs 方法 p_funcs.nested_func pickled/unpickled 对我来说很好(我没有尝试结合不过,它与 PySpark 一起使用),所以下面的解决方案是否必要可能取决于您的 Python 版本/平台等。 Pickle should support bound methods from Python 3.4 ,但它看起来像 PySpark forces the pickle protocol to 3 ,这将停止该工作。可能有办法改变这一点,但我不知道。

众所周知,嵌套函数是不可pickle的,因此这种方法绝对有效。类方法是正确的。

我在评论中的建议是尝试对类进行酸洗,而不是绑定(bind)函数。为此,该类的实例需要可调用,因此您将函数重命名为 __call__

class PartitionFuncs:
def __init__(self, fixed_var_1, fixed_var_2):
self.fixed_var_1 = fixed_var_1
self.fixed_var_2 = fixed_var_2

def __call__(self, partition):
for row in partition:
yield dosomething(row, self.fixed_var_1, self.fixed_var_2)

这确实取决于默认情况下可pickle的两个fixed_var变量。如果不是,您可以写 custom saving and loading methods, as described in the pickle documentation .

正如您在评论中指出的那样,这确实意味着您定义的每个函数都需要一个单独的类。这里的选项涉及继承,因此有一个单独的 PickleableData 类,每个 Func 类都可以保存对它的引用。

关于python - 使用 Cython 时,可以替代在 PySpark mapPartitions 中使用嵌套函数吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56109264/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com