gpt4 book ai didi

python - 如何对包含 R 函数的 pyspark RDD 进行分区

转载 作者:太空宇宙 更新时间:2023-11-03 11:25:34 25 4
gpt4 key购买 nike

import rpy2.robjects as robjects

dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect()

输出

[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]

虽然分区版本导致错误:

dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()

RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>

这个错误似乎是 R 没有加载到其中一个分区上,我认为这意味着没有执行第一个导入步骤。有没有办法解决?

EDIT 1 第二个例子让我认为 pyspark 或 rpy2 的计时存在错误。

dffunc = sc.parallelize([(0,robjects.r.rnorm),     (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
import rpy2.robjects as robjects
return model[1](2)
dffunc.map(loadmodel).collect()

产生相同的错误 R cannot evaluate code before being initialized。

dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
import rpy2.robjects as robjects
import pickle
return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()

按预期工作。

最佳答案

我想说“这不是 rpy2 中的错误,这是一个功能”,但实际上我不得不承认“这是一个限制”。

发生的事情是 rpy2 有 2 interface levels .一个是低级接口(interface)(更接近 R 的 C API),可通过 rpy2.rinterface 获得,另一个是高级接口(interface),具有更多的花里胡哨、更“pythonic”,并且具有继承自 rinterface level-ones 的 R 对象的类(最后一部分对于下面关于 pickling 的部分很重要)。导入高级接口(interface)会导致在必要时使用默认参数初始化(启动)嵌入式 R。导入低级接口(interface) rinterface 没有这种副作用,嵌入式 R 的初始化必须显式执行(函数 initr)。 rpy2 是这样设计的,因为嵌入式 R 的初始化可以有参数:首先导入 rpy2.rinterface,设置初始化,然后导入 rpy2.robjects 使这成为可能。

除此之外,由 rpy2 包装的 R 对象的序列化(pickling)目前仅定义在 rinterface 级别(参见 documentation)。腌制 robjects-level(高级)rpy2 对象使用 rinterface-level 代码,当 unpickling 它们时,它们将保持在较低级别(Python pickle 包含module 对象的类在其中定义并将导入该模块 - 这里 rinterface,这并不意味着嵌入式 R 的初始化)。事情变成这样的原因很简单,它“现在已经足够好了”:在实现它的时候,我不得不同时想出一种很好的方法来桥接两种不同的语言,并通过 Python C-API 和酸洗/解酸洗 Python 对象。考虑到可以轻松编写类似的东西

import rpy2.robjects

import rpy2.rinterface
rpy2.rinterface.initr()

在 unpickling 之前,这从未被重新访问过。我所知道的 rpy2 酸洗的用途是使用 Python 的 multiprocessing(并在初始化子进程的代码中添加类似于 import 语句的内容是一种廉价且足够的修复方法)。愿现在是时候再看看这个了。如果是的话,请为 rpy2 提交错误报告。

编辑:这无疑是 rpy2 的问题。 pickled robjects 级别的对象应该unpickle回到robjects级别,而不是rinterface级别。我开了一个issue in the rpy2 tracker (并且已经在 default/dev 分支中推送了一个基​​本补丁)。

第二次编辑:该补丁是从版本 2.7.7 开始发布的 rpy2 的一部分(撰写本文时的最新版本是 2.7.8)。

关于python - 如何对包含 R 函数的 pyspark RDD 进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34669751/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com