gpt4 book ai didi

python - 简单的 dask map_partitions 示例

转载 作者:太空狗 更新时间:2023-10-29 22:05:22 25 4
gpt4 key购买 nike

我阅读了以下 SO thead现在我正在努力理解它。这是我的例子:

import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random

df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })

def test_f(col_1, col_2):
return col_1*col_2

ddf = dd.from_pandas(df, npartitions=8)

ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

它会在下面生成以下错误。我究竟做错了什么?此外,我不清楚如何将附加参数传递给 map_partitions 中的函数?

---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
136 try:
--> 137 yield
138 except Exception as e:

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132

TypeError: test_f() got an unexpected keyword argument 'columns'

During handling of the above exception, another exception occurred:

ValueError Traceback (most recent call last)
<ipython-input-9-913789c7326c> in <module>()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs)
469 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP
470 """
--> 471 return map_partitions(func, self, *args, **kwargs)
472
473 @insert_meta_param_description(pad=12)

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs)
3163
3164 if meta is no_default:
-> 3165 meta = _emulate(func, *args, **kwargs)
3166
3167 if all(isinstance(arg, Scalar) for arg in args):

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
3129 """
3130 with raise_on_meta_error(funcname(func)):
-> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
3132
3133

~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback)
75 value = type()
76 try:
---> 77 self.gen.throw(type, value, traceback)
78 except StopIteration as exc:
79 # Suppress StopIteration *unless* it's the same exception that

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
148 ).format(" in `{0}`".format(funcname) if funcname else "",
149 repr(e), tb)
--> 150 raise ValueError(msg)
151
152

ValueError: Metadata inference failed in `test_f`.

Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)

Traceback:
---------
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error
yield
File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

最佳答案

map_partitions docs中有一个例子准确地实现正在尝试做的事情:

ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))

当您调用 map_partitions 时(就像您在 pandas.DataFrame 上调用 .apply() 时),您尝试调用的函数map(或apply)将被赋予数据帧作为第一个参数。

dask.dataframe.map_partitions 的情况下,第一个参数将是分区,在 pandas.DataFrame.apply 的情况下 - a整个数据框。

这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:

def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])

请注意,在您调用 .compute() 之前,在这种情况下会发生(即安排发生)新列的分配。

在您的示例中,您在调用 .compute() 之后分配了列,这违背了使用 dask 的目的。 IE。在您调用 .compute() 之后,该操作的结果将被加载到内存中 如果有足够的空间 用于这些结果(如果没有,您只会得到 MemoryError).

因此,为了您的工作示例,您可以:

1) 使用函数(以列名作为参数):

def test_f(df, col_1, col_2):
return df.assign(result=df[col_1] * df[col_2])


ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get) # Will load the whole dataframe into memory

2) 使用 lambda(在函数中硬编码列名):

ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get) # Will load the whole dataframe into memory

更新:

要逐行应用函数,这里引用您链接的帖子:

map / apply

You can map a function row-wise across a series with map

df.mycolumn.map(func)

You can map a function row-wise across a dataframe with apply

df.apply(func, axis=1)

即对于您问题中的示例函数,它可能如下所示:

def test_f(dds, col_1, col_2):
return dds[col_1] * dds[col_2]

由于您将逐行应用它,因此函数的第一个参数将是一个系列(即数据框的每一行都是一个系列)。

要应用这个函数,你可以这样调用它:

dds_out = ddf.apply(
test_f,
args=('col_1', 'col_2'),
axis=1,
meta=('result', int)
).compute(get=get)

这将返回一个名为 'result' 的系列。

我想你也可以在每个分区上调用 .apply 一个函数,但它看起来并不比直接在数据帧上调用 .apply 更有效。但也许您的测试会证明并非如此。

关于python - 简单的 dask map_partitions 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47125665/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com