我写了一个简单的 python 多处理,它从 csv 中读取一堆行,调用 api,然后写入新的 csv。但是,我看到的是该程序的性能与顺序执行相同。更改池大小没有任何影响。出了什么问题?
from multiprocessing import Pool
from random import randint
from time import sleep
import csv
import requests
import json
def orders_v4(order_number):
response = requests.request("GET", url, headers=headers, params=querystring, verify=False)
return response.json()
newcsvFile=open('gom_acr_status.csv', 'w')
writer = csv.writer(newcsvFile)
def process_line(row):
ol_key = row['\ufeffORDER_LINE_KEY']
order_number=row['ORDER_NUMBER']
orders_json = orders_v4(order_number)
oms_order_key = orders_json['oms_order_key']
order_lines = orders_json["order_lines"]
for order_line in order_lines:
if ol_key==order_line['order_line_key']:
print(order_number)
print(ol_key)
ftype = order_line['fulfillment_spec']['fulfillment_type']
status_desc = order_line['statuses'][0]['status_description']
print(ftype)
print(status_desc)
listrow = [ol_key, order_number, ftype, status_desc]
#(writer)
writer.writerow(listrow)
newcsvFile.flush()
def get_next_line():
with open("gom_acr.csv", 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
f = get_next_line()
t = Pool(processes=50)
for i in f:
t.map(process_line, (i,))
t.join()
t.close()
编辑:我刚刚注意到您在循环中调用了 map
。你只需要调用一次。 is 是一个阻塞函数,它不是异步的!查看the docs有关正确用法的示例。
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
原答案:
所有进程写入输出文件的事实导致文件系统争用。
如果您的 process_line
函数只返回行(例如作为字符串列表),那么主进程将在 map
返回它们之后写入所有这些行,那么您应该会体验到性能提升。
还有 2 个注释:
- 尝试不同数量的进程,从# of cores 开始,然后再增加。也许 50 太多了。
- 在每个流程中完成的工作似乎(对我来说,乍一看)很短,产生新流程和编排它们的开销可能太大而无法完成手头的任务。
我是一名优秀的程序员,十分优秀!