gpt4 book ai didi

python - 对 Pandas 中的行进行多重处理以替换为 REGEX

转载 作者:太空宇宙 更新时间:2023-11-03 19:47:50 28 4
gpt4 key购买 nike

我正在处理大型数据表,100M+ 行。在某些列上,我需要对多个术语执行正则表达式替换。我预先编译了所有术语并将其存储在字典中以供使用。用户选择要清理的列。替换后,数据将保存到不同的 csv 文件中。

我有一个适合内存的表解决方案,但未启用多处理器,因此它仅使用一个核心。

我想将其转移到多处理器以获得这些好处。我的代码中最相关的部分如下:

def SendToFile(write_df):
if i == 0:
write_df.to_csv(writename, mode='w', index=None)
else:
write_df.to_csv(writename, mode='a', index=None)
return 1

def CleanTheChunk(clean_df):
df=clean_df.copy()
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
return df

###
#read in data, pre-compile regex terms, select columns to scrub of terms etc.
###

if large_data==0:
#read in the data
df = pd.read_csv(filename, dtype='str')

#clean the file in every column indicated:
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
#Save the cleaned version to file
df.to_csv(writename, index=None)

else: #This is how it handles if it was large data
i=0 #i is used to identify when the first chunk was written 'w' or 'a'
#read the file in chunks
for chunk in pd.read_csv(filename, chunksize=csize, dtype='str'):

#Clean the file:
chunk = CleanTheChunk(chunk)
#Save the file
i=SendToFile(chunk)
print("Jobs done.")

行不会相互影响,但它们确实需要以正确的顺序保存到新的 csv 中。我只是无法思考如何读取多个 block ,并行处理它们,然后以正确的顺序写入新的 csv。

更新我尝试了一种新方法。我已将所有逻辑折叠到一个函数中,然后调用该函数来映射。我将尝试缩短该函数以解决我现在遇到的错误。

def MP_Cleaner(chunk):
#read in the banned terms
#Add escape characters to any control characters in the baned terms
#Create the regex pattern
#Iterate over the columns that need scrubbing
#use chunk['col_name'].replace(regexterm, regex=true, inplace=true)
return chunk

def parallelize(data, func):
data_split = np.array_split(data, cores)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data


df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
df_done=parallelize(df, MP_Cleaner)
df_done.to_csv(writename, index=None)

#That is it, all processing done nd file hould be saved
print("Job Complete, "+ writename + " saved.")
stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
print("Start time: " + start_time)
print(" Stop time: " + stop_time)
proceed=input("Press Enter to exit:")
print(proceed)

我收到属性错误:“列表”对象没有属性“替换”

最佳答案

想通了。我还使用了很多地方的一些代码。 Speed up millions of regex replacements in Python 3

http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/

最后写下,以防有人有类似的问题需要解决:

仅适用于适合 RAM 的文件,仍然必须使其足以适用于对于 RAM 来说太大的文件,而不放弃任何好处。

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import re
from multiprocessing import Pool
from trie import Trie

#Enter the filename of the csv to be scrubbed
#After processing the output will be have the prefix "CLEANED_" added to the
# filename provided
filename = "longest-2019-Oct.csv"

#How many cores to use, make sure you save one for overhead. The entire file
# must fit in RAM for this method to work
cores = 9

#This is the file name for the scrubterms, that file must be in the same
# directory as this script. It must be a single column whose name is "items"
scrubfile="terms.csv"

#Enter the desired term to cover redactions, default is XXX
redact = "XXX"

#Columns to clean, they must be typed correctly, in "", seperated by commas
# to remove the columns earth, wind, and fire it would be
# ["earth", "wind", "fire"]
cols=["massive"]

#***************DO NOT CHANGE ANYTHING BELOW THIS LINE*************************
writename="CLEANED_"+filename

def trie_regex_from_words(words):
trie = Trie()
for word in words:
trie.add(word)
return re.compile(r"\b" + trie.pattern() + r"\b", re.IGNORECASE)

#read in the terms to be cleaned

def MP_Cleaner(chunk):
#read in the terms
scrub_df= pd.read_csv(scrubfile, dtype='str')
#Pull just the items
my_scrub=scrub_df['items'].tolist()

#The chars we must protect
SpecialCharacters = [chr(92), chr(46), chr(94), chr(36), chr(42), chr(43),
chr(63), chr(123), chr(125), chr(91), chr(93),
chr(124), chr(40), chr(41), chr(34), chr(39)]

#walk through terms and replace special characters with the escpae
# character so they can be processed in regex properly
for i in range(len(SpecialCharacters)):
replacement = chr(92) + SpecialCharacters[i]
my_scrub = [term.replace(SpecialCharacters[i], replacement ) for term in my_scrub]

Trie_Scrub = trie_regex_from_words(my_scrub)

for elem in cols:
chunk[elem].replace(Trie_Scrub, value=redact, regex=True, inplace = True)

return chunk

def parallelize(data, func):
data_split = np.array_split(data, cores)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data

start_time=time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )

df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
df_done = parallelize(df, MP_Cleaner)
df_done.to_csv(writename, index=None)

#That is it, all processing done nd file hould be saved
print("Job Complete, "+ writename + " saved.")
stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
print("Start time: " + start_time)
print(" Stop time: " + stop_time)
proceed=input("Press Enter then close the window to exit:")
print(proceed)

关于python - 对 Pandas 中的行进行多重处理以替换为 REGEX,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60020936/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com