gpt4 book ai didi

python - 如何使用 dask 读取 csv 并处理行?

转载 作者:行者123 更新时间:2023-12-01 08:27:27 24 4
gpt4 key购买 nike

我想读取 28Gb csv 文件并打印内容。但是,我的代码:

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

def hash_string(self, string):
return md5(string.encode('utf-8')).hexdigest()

def dbproc(self, db):
db[self.hash_string(self.row)] = self.row

def index_row(self, row):
self.row = row
DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes') # convert to pandas
df = df.to_dict(orient='records')
for row in df:
ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

不工作。当我运行命令 htop 时,我可以看到 dask 正在运行,但没有任何输出。也没有创建任何index.kch 文件。我在不使用 dask 的情况下提示了同样的事情,并且运行良好;我使用的是 Pandas 流 api (chunksize),但速度太慢,因此我想使用 dask。

最佳答案

df = df.compute(scheduler='processes')     # convert to pandas

不要这样做!

您正在单独的进程中加载​​这些片段,然后将所有要拼接的数据传输到主进程中的单个数据帧中。这只会增加处理开销,并在内存中创建数据副本。

如果您想要做的只是(出于某种原因)将每一行打印到控制台,那么您将非常适合使用 Pandas 流式 CSV 阅读器 (pd.read_csv(chunksize=..) )。您可以使用Dask的分块来运行它,并且如果您在读取数据的工作线程中进行打印,则可能会获得加速:

df = dd.read_csv(..)

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
for row in df:
print(row)

dask.compute(*[print_a_block(d) for d in df.to_delayed()])

请注意,for row in df 实际上会获取列,也许您想要 iterrows,或者也许您实际上想以某种方式处理数据。

关于python - 如何使用 dask 读取 csv 并处理行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54148429/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com