- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个函数可以将 df 的内容写入 csv 文件。
def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
headers = []
fid = open(defFile, 'r')
for line in fid:
headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
df = pd.DataFrame([], columns=headers)
for header in outDf.columns.values:
if header in headers:
df[header] = outDf[header]
df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')
我怎样才能并行化这个过程?目前我正在使用以下代码
def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim='\t', quotechar='"'):
logInfo('Start writingtoSchema in parallel...', 'track')
headers = []
fid = open(defFile, 'r')
for line in fid:
headers.append(line.replace('\r','').split('\n')[0].split('\t')[0])
df = pd.DataFrame([], columns=headers)
for header in outDf.columns.values:
if header in headers:
df[header] = outDf[header]
out_Names = Queue()
cores = min([int(multiprocessing.cpu_count() / 2), int(len(outDf) / 200000)+1])
#cores=4
logInfo(str(cores) + 'cores are used...', 'track')
# split the data for parallel computation
outDf = splitDf(df, cores)
# process the chunks in parallel
logInfo('splitdf called are df divided...', 'track')
Filenames=[]
procs = []
fname=toFile.split("_Opera_output")[0]
for i in range(0, cores):
filename=fname+"_"+str(i)+".tsv"
proc = Process(target=writeToSchema, args=(outDf[i], defFile,filename, retainFlag,delim, quotechar,i))
procs.append(proc)
proc.start()
print 'processing '+str(i)
Filenames.append(filename)
# combine all returned chunks
# outDf = out_Names.get()
# for i in range(1, cores):
# outDf = outDf.append(out_q.get(), ignore_index=True)
for proc in procs:
proc.join()
logInfo('Now we merge files...', 'track')
print Filenames
with open(toFile,'w') as outfile:
for fname in Filenames:
with open(fname) as infile:
for line in infile:
outfile.write(line)
但它没有工作并给出以下错误
2017-12-17 16:02:55,078 - track - ERROR: Traceback (most recent call last):
2017-12-17 16:02:55,078 - track - ERROR: File "C:/Users/sudhir.tiwari/Document
s/AMEX2/Workspace/Backup/Trunk/code/runMapping.py", line 257, in <module>
2017-12-17 16:02:55,089 - track - ERROR: writeToSchemaParallel(outDf, defFile, t
oFile, retainFlag, delim='\t', quotechar='"')
2017-12-17 16:02:55,153 - track - ERROR: File "C:\Users\sudhir.tiwari\Document
s\AMEX2\Workspace\Backup\Trunk\code\utils.py", line 510, in writeToSchemaParalle
l
2017-12-17 16:02:55,163 - track - ERROR: with open(fname) as infile:
2017-12-17 16:02:55,198 - track - ERROR: IOError
2017-12-17 16:02:55,233 - track - ERROR: :
2017-12-17 16:02:55,233 - track - ERROR: [Errno 2] No such file or directory: 'C
:/Users/sudhir.tiwari/Documents/AMEX2/Workspace/Input/work/Schindler_20171130/Sc
hindler_20171130_0.tsv'
而且它没有写入文件,因为当我搜索没有找到文件的位置时。我正在使用多处理将数据帧写入多个文件,然后合并所有文件。 split df 将dataframe分成n份。
最佳答案
使用多处理方式会比默认方式(直接保存)消耗更多时间。通过使用 Synchronization between processes , 使用Processes 和Lock 并行写入进程。以下是 POC 示例。
import pandas as pd
import numpy as np
from multiprocessing import Lock, Process
from time import time
def writefile(df,l):
l.acquire()
df.to_csv('dataframe-multiprocessing.csv', index=False, mode='a', header=False)
l.release()
if __name__ == '__main__':
a = np.random.randint(1,1000,10000000)
b = np.random.randint(1,1000,10000000)
c = np.random.randint(1,1000,10000000)
df = pd.DataFrame(data={'a':a,'b':b,'c':c})
print('Iterative way:')
print()
new = time()
df.to_csv('dataframe-conventional.csv', index=False, mode='a', header=False)
print(time() - new, 'seconds')
print()
print('Multiprocessing way:')
print()
new = time()
l = Lock()
p = Process(target=writefile, args=(df,l))
p.start()
p.join()
print(time() - new, 'seconds')
print()
df1 = pd.read_csv('dataframe-conventional.csv')
df2 = pd.read_csv('dataframe-multiprocessing.csv')
print('If both file same or not:')
print(df1.equals(df2))
结果:
C:\Users\Ariff\Documents\GitHub\testing-code>python pandas_multip.py
Iterative way:
18.323541402816772 seconds
Multiprocessing way:
20.14128303527832 seconds
If both file same or not:
True
关于python - 如何在python中使用多处理将df的内容写入csv文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47859537/
努力理解标题中 5 个示例之间的区别。系列与数据框有一些用例吗?什么时候应该使用一个而不是另一个?哪些是等价的? 最佳答案 df[x] — 使用变量 x 索引列。返回 pd.Series df[[x]
在使用Jupyter Notebook时,我必须为问题标题中提到的df.info()、df.head()等单独留出空格. 有没有办法像第二张图片那样把所有这些都放在一个 block 中,并显示所有信息
我想求三列之和,我采取的方法如下: In [14]: a_pd = pd.DataFrame({'a': np.arange(3), 'b': [5, 7,
我想我们大多数人已经使用过这样的东西(至少如果你正在使用 tidyverse): library(tidyverse) example % select(- mpg) 我的问题: 我知道这部分有一
我有一个 DF,里面有大约 20,000 行。我构建了一个 Python 脚本来对这些数据(包括数据透视表)运行大量清理和数学运算。 我想将此 DF 拆分为 3 个独立的 DF,然后根据列值将这 3
我什至不知道如何表达这一点,但在 Python 中有没有一种方法可以引用等号之前的文本,而无需实际再次编写? ** 编辑 - 我在 Jupyter 中使用 python3 我似乎用了半辈子的时间来写作
在 df1 中,每个单元格值都是我想要从 df2 中获取的行的索引。 我想获取 df2 trial_ms 列中行的信息,然后根据获取的 df2 列重命名 df1 中的列。 可重现的 DF: # df1
我想转换此表 0 thg John 3.0 1 thg James 4.0 2 mol NaN 5.0 3 mol NaN NaN 4
我有一个数据框,我想从中提取 val 中的值大于 15 以及 val 不是 NA: df[ !is.na(df$val) & df$val > 15, ] 由于我假设在 R 中经常需要这样的比较,所
鉴于 coming deprecation of df.ix[...] 如何替换这段代码中的 .ix? df_1 = df.ix[:, :datetime.time(16, 50)] d
任何我可以帮助我说出 Pandas 中这两个语句之间的区别-python df.where(df['colname'] == value) 和 df[(df['colname'] == value)]
考虑 df Index A B C 0 20161001 0 24.5 1 20161001 3 26.5 2
所以我需要按“fh_status”列对行进行分组,然后对每个组执行“gini”的最小值、平均值和最大值(将有三个)。我想出了这段代码: m = (df2.groupby(['fh_status']).
我尝试计算不同公司/股票的一些 KPI。我的股票信息位于 df 中,具有以下结构 Ticker Open High Low Ad
我有一个看起来像这样的 df: gene ID Probe ID Chromosome Start Stop 1: H3F3A 539154271
nn_idx_df 包含与 xyz_df 的索引匹配的索引值。如何从 xyz_df 中的 H 列获取值并在 nn_idx_df 中创建新列以匹配 output_df 中所示的结果。我可以解决这个问题,
我目前的 DF 看起来像这样 Combinations Count 1 ('IDLY', 'VADA') 3734 6 ('DOSA', 'IDLY')
我看到了几个与此相关的问题,但我发现这些技巧都不起作用。 我正在尝试根据第二个数据帧的值填充数据帧的所有 NaN 值。第一个 df 很大,第二个 df 将充当某种键。 DF1 Par
我有两个数据帧,df1 和 df2。每个数据帧的唯一标识符是“ID”和“Prop_Number”。我需要将 df1 中的 Num1、2 和 3 列复制到 df2、1_Num 中的相应列...但我不确定
我有以下数据框: 注意:日期是索引 city morning afternoon evening midnight date 2014-05-01 Y
我是一名优秀的程序员,十分优秀!