gpt4 book ai didi

python - 如何在 Spark 的 map 函数中使用数据框?

转载 作者:太空狗 更新时间:2023-10-30 01:20:49 56 4
gpt4 key购买 nike

定义:

  • sampleDF是具有用于查找目的的列表记录的示例数据框。
  • sampleDS是一个包含元素列表的 RDD。
  • mappingFunction就是查找sampleDS的元素在 sampleDF如果它们存在于 sampleDF 中,则将它们映射到 1如果他们不这样做,则为 0。

我有一个映射函数如下:

def mappingFunction(element):
# The dataframe lookup!
lookupResult = sampleDF.filter(sampleDF[0] == element).collect()
if len(lookupResult) > 0:
print lookupResult
return 1
return 0

问题:

正在访问 sampleDF在映射函数之外工作得很好,但是一旦我在函数内部使用它,我就会收到以下错误:

py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:744)

我还尝试了什么:

我确实尝试保存一个临时表并使用 sqlContext在 map 功能中选择,但仍然无法正常工作。这是我得到的错误:

  File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/opt/spark/python/pyspark/cloudpickle.py", line 542, in save_reduce
save(state)
File "/usr/lib64/python2.6/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
save(v)
File "/usr/lib64/python2.6/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable

我的要求:

我试图通过简单的例子来简化我的问题。非常感谢任何有关如何在 map 函数内部使用数据框的帮助。

最佳答案

这是不可能的。 Spark 不支持分布式数据结构(RDDsDataFramesDatasets)的嵌套操作。即使它执行大量作业也不是一个好主意。根据您展示的代码,您可能希望将 RDD 转换为 DataFrame 并执行 join wit

(rdd.map(x => (x, )).toDF(["element"])
.join(sampleDF, sampleDF[0] == df[0])
.groupBy("element")
.agg(count("element") > 0))

边注 map 中的打印是完全无用的,更不用说它会增加额外的 IO 开销。

关于python - 如何在 Spark 的 map 函数中使用数据框?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35859351/

56 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com