- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我需要在 Spark DataFrame 上进行分布式计算,在 DataFrame block 上调用一些任意(非 SQL)逻辑。我做了:
def some_func(df_chunk):
pan_df = df_chunk.toPandas()
#whatever logic here
df = sqlContext.read.parquet(...)
result = df.mapPartitions(some_func)
不幸的是,它会导致:
AttributeError: 'itertools.chain' object has no attribute 'toPandas'
我希望在每次映射调用中都有 spark DataFrame 对象,但我得到的是“itertools.chain”。为什么?以及如何克服这个问题?
最佳答案
试试这个:
>>> columns = df.columns
>>> df.rdd.mapPartitions(lambda iter: [pd.DataFrame(list(iter), columns=columns)])
关于python - Spark DataFrame mapPartitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38749179/
我目前有一个 mapPartitions 作业,它是 flatMapping 中的每个值迭代器,我遇到了一个问题,其中会产生主要的 GC 成本在某些处决上。一些执行者需要 20 分钟,其中 15 分钟
我正在使用 Spark 的 python api。 我有一个大文本,我用 rdd = sc.loadtxt("file.txt") 加载。 之后,我想对rdd进行mapPartitions转换。 但是
我有以下 RDD,它有 4 个分区:- val rdd=sc.parallelize(1 to 20,4) 现在我尝试在此调用 mapPartitions:- scala> rdd.mapPartit
我有一个 FlatMapFunction>> 的实现类,String>。为每个分区初始化一些不可串行的连接。但是当我在迭代器上调用 next() 时,它为多个分区提供相同的记录。代码如下: @Over
我在scala中有这个代码 object SimpleApp { def myf(x: Iterator[(String, Int)]): Iterator[(String, Int)] = {
我需要在 Spark DataFrame 上进行分布式计算,在 DataFrame block 上调用一些任意(非 SQL)逻辑。我做了: def some_func(df_chunk): p
所以我正在尝试使用 Python (Pyspark) 来学习 Spark。我想知道函数 mapPartitions 是如何工作的。这就是它需要的输入和它给出的输出。我在互联网上找不到任何合适的例子。可
我可以在 pyspark mapPartitions 中使用多线程吗? 我正在运行一个 spark 作业,我必须在其中对每一行进行 API 调用。我们正在使用 rdd map 运行 python 函数
假设我有以下数据框: var randomData = Seq(("a",8),("h",5),("f",3),("a",2),("b",8),("c",3) val df = sc.parallel
在某些情况下,我可以使用mapPartitions或foreach方法获得相同的结果。 例如,在典型的 MapReduce 方法中,我们会在将原始 RDD 转换为元组(键、值)集合的 mapParti
我试图在 Scala 中使用 mapPartitions 但出现以下错误。 [error] found : Unit [error] required: Iterator[?] [error]
我阅读了很多有关 map 和 mapPartitions 之间差异的文章。我仍然有一些疑问。事情是在阅读之后我决定在我的代码中更改 mapPartitions 的 map 函数,因为显然 mapPar
RDD's 之间有什么区别? map 和 mapPartitions 方法? flatMap 的行为类似于 map 还是 mapPartitions?谢谢。 (编辑)即之间有什么区别(语义上或执行方面
我有 DocsRDD : RDD[String, String] val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2) DocsRDD: Doc1.
我通读了 map 和 mapPartitions 之间的理论差异,并且很清楚何时在各种情况下使用它们。 但我下面描述的问题更多是基于 GC Activity 和内存 (RAM)。请阅读下面的问题:-
我对 MapPartition 没有搞清楚。请有人解释一下 Mappartition 的一些用例以及它与 FlatMap 有何不同? 最佳答案 区别在于方法的接口(interface)以及它们的调用方
我有一些中间数据需要存储在 HDFS 和本地。我正在使用 Spark 1.6。在作为中间形式的 HDFS 中,我在 /output/testDummy/part-00000 和 /output/tes
在 Spark 中,您可以为 mapPartitions 使用用户定义的函数。现在我的问题是如何向它传递参数。例如,目前我有这样的东西,它使用 rdd.mapPartitions(userdefine
我想知道使用 Spark 有什么不同 mapPartitions功能与 transient 惰性值。 由于每个分区基本上都在不同的节点上运行,因此将在每个节点上创建一个 transient 惰性 va
我是 Spark 和 Scala 的新手,这就是为什么我很难通过它。 我打算做的是使用 Spark 使用 Stanford CoreNLP 预处理我的数据。我知道我必须使用 mapPartitions
我是一名优秀的程序员,十分优秀!