gpt4 book ai didi

java - Spark 1.6 : How do convert an RDD generated from a Scala jar to a pyspark RDD?

转载 作者:行者123 更新时间:2023-12-02 02:51:15 25 4
gpt4 key购买 nike

我正在尝试创建一些 POC 代码来演示如何从 PySpark 调用 Scala 函数,以便结果是 PySpark.RDD。

这是 Scala 端的代码:

object PySpark extends Logger {

def getTestRDD(sc: SparkContext): RDD[Int] = {
sc.parallelize(List.range(1, 10))
}

}

这就是我在 PySpark 端访问它所做的事情:

>>> foo = sc._jvm.com.clickfox.combinations.lab.PySpark
>>> jrdd = foo.getTestRDD(sc._jsc.sc())
>>> moo = RDD(jrdd, sc._jsc.sc())
>>> type(moo)
>>> <class 'pyspark.rdd.RDD'>

到目前为止一切顺利 - 我返回的似乎是 PySpark.RDD 的实例。当我尝试使用 RDD 时出现问题:

>>> moo.take(1)
>>> Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 1267, in take
totalParts = self.getNumPartitions()
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 356, in getNumPartitions
return self._jrdd.partitions().size()
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o25.size. Trace:
py4j.Py4JException: Method size([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

我还尝试传入 PySpark 上下文而不是 Java 上下文,看看会发生什么:

>>> moo = RDD(jrdd, sc)
>>> moo.collect()
>>> Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/rdd.py", line 771, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/spark-1.6.3-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o21.rdd. Trace:
py4j.Py4JException: Method rdd([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

还是不行。有没有办法从 PySpark 转换或至少访问 Java RDD 内的数据?

编辑我知道我可以在 Java 方面将 RDD 转换为数组,并迭代生成的 JavaArray 对象,但如果可能的话,我想避免这种情况。

最佳答案

what I get back appears to be an instance of PySpark.RDD.

仅仅因为它是一个有效的 PySpark RDD,并不意味着 Python 可以理解其内容。您传递的是 Java 对象的 RDD。对于内部转换,Spark 使用 Pyrolite 在 Python 和 JVM 之间重新序列化对象。

这是一个内部 API,但您可以:

from pyspark.ml.common import _java2py

rdd = _java2py(
sc, sc._jvm.com.clickfox.combinations.lab.PySpark.getTestRDD(sc._jsc.sc()))

请注意,这种方法相当有限,仅支持基本类型转换。

您还可以使用 RDD 替换为 DataFrame:

object PySpark {
def getTestDataFrame(sqlContext: SQLContext): DataFrame = {
sqlContext.range(1, 10)
}
}
from pyspark.sql.dataframe import DataFrame

DataFrame(
sc._jvm.com.clickfox.combinations.lab.PySpark.getTestDataFrame(
sqlContext._jsqlContext),
sqlContext)

关于java - Spark 1.6 : How do convert an RDD generated from a Scala jar to a pyspark RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43812365/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com