gpt4 book ai didi

python - 如何在 AWS Lambda 中模拟 multiprocessing.Pool.map()?

转载 作者:行者123 更新时间:2023-12-03 14:33:04 24 4
gpt4 key购买 nike

AWS Lambda 上的 Python 不支持 multiprocessing.Pool.map() ,如 this other question 中所述.请注意,另一个问题是问为什么它不起作用。这个问题不同,我问的是在缺乏底层支持的情况下如何模拟功能。

另一个问题的答案之一为我们提供了以下代码:

# Python 3.6
from multiprocessing import Pipe, Process

def myWorkFunc(data, connection):
result = None

# Do some work and store it in result

if result:
connection.send([result])
else:
connection.send([None])


def myPipedMultiProcessFunc():

# Get number of available logical cores
plimit = multiprocessing.cpu_count()

# Setup management variables
results = []
parent_conns = []
processes = []
pcount = 0
pactive = []
i = 0

for data in iterable:
# Create the pipe for parent-child process communication
parent_conn, child_conn = Pipe()
# create the process, pass data to be operated on and connection
process = Process(target=myWorkFunc, args=(data, child_conn,))
parent_conns.append(parent_conn)
process.start()
pcount += 1

if pcount == plimit: # There is not currently room for another process
# Wait until there are results in the Pipes
finishedConns = multiprocessing.connection.wait(parent_conns)
# Collect the results and remove the connection as processing
# the connection again will lead to errors
for conn in finishedConns:
results.append(conn.recv()[0])
parent_conns.remove(conn)
# Decrement pcount so we can add a new process
pcount -= 1

# Ensure all remaining active processes have their results collected
for conn in parent_conns:
results.append(conn.recv()[0])
conn.close()

# Process results as needed

是否可以修改此示例代码以支持 multiprocessing.Pool.map() ?

到目前为止我尝试了什么

我分析了上面的代码,我没有看到要执行的函数的参数或数据,所以我推断它没有执行与 multiprocessing.Pool.map()相同的功能。 .除了演示可以组装成解决方案的构建块之外,尚不清楚代码的作用。

这是“为我编写代码”的问题吗?

是的,在某种程度上,确实如此。这个问题影响了成千上万的 Python 开发人员,如果我们所有人共享相同的代码,而不是强制每个遇到这个问题的 SO 用户去开发,它对世界经济的效率会更高,温室气体排放更少,等等他们自己的解决方法。我希望我已经完成了我的工作,将这个问题提炼成一个明确的问题,并准备好了假定的构建块。

最佳答案

我能够在我自己的测试中使用它。
我的代码基于此链接:https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

NB1:你必须增加对 lambda 函数的内存分配 .使用默认的最小数量,多处理不会提高性能。我的帐户可以分配的最大值 (3008MB) 达到以下数字。

NB2:我在这里完全忽略了并行的最大进程。我的用法没有很多元素需要处理。

使用下面的代码,用法是:

work = funcmap(yourfunction,listofstufftoworkon)
yourresults = work.run()

从我的笔记本电脑运行:
jumper@jumperdebian[3333] ~/scripts/tmp  2019-09-04 11:52:30
└─ $ ∙ python3 -c "import tst; tst.lambda_handler(None,None)"
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 9.574460506439209
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 6.422513484954834

从 aws 运行:
Function Logs:
START RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97 Version: $LATEST
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
SP runtime : 12.135798215866089
results : [(35, 9227465), (35, 9227465), (35, 9227465), (35, 9227465)]
MP runtime : 7.293526887893677
END RequestId: 075a92c0-7c4f-4f48-9820-f394ee899a97

这是测试代码:
import time
from multiprocessing import Process, Pipe
import boto3

class funcmap(object):

fmfunction=None
fmlist=None

def __init__(self,pfunction,plist):
self.fmfunction=pfunction
self.fmlist=plist

def calculation(self, pfunction, pload, conn):
panswer=pfunction(pload)
conn.send([pload,panswer])
conn.close()

def run(self):
datalist = self.fmlist
processes = []
parent_connections = []
for datum in datalist:
parent_conn, child_conn = Pipe()
parent_connections.append(parent_conn)
process = Process(target=self.calculation, args=(self.fmfunction, datum, child_conn,))
processes.append(process)

pstart=time.time()
for process in processes:
process.start()
#print("starting at t+ {} s".format(time.time()-pstart))
for process in processes:
process.join()
#print("joining at t+ {} s".format(time.time()-pstart))

results = []
for parent_connection in parent_connections:
resp=parent_connection.recv()
results.append((resp[0],resp[1]))
return results


def fibo(n):
if n <= 2 : return 1
return fibo(n-1)+fibo(n-2)

def lambda_handler(event, context):
#worklist=[22,23,24,25,26,27,28,29,30,31,32,31,30,29,28,27,26,27,28,29]
#worklist=[22,23,24,25,26,27,28,29,30]
worklist=[30,30,30,30]
#worklist=[30]
_start = time.time()
results=[]
for a in worklist:
results.append((a,fibo(a)))
print("results : {}".format(results))
_end = time.time()
print("SP runtime : {}".format(_end-_start))

_mstart = time.time()
work = funcmap(fibo,worklist)
results = work.run()
print("results : {}".format(results))
_mend = time.time()
print("MP runtime : {}".format(_mend-_mstart))

希望能帮助到你。

关于python - 如何在 AWS Lambda 中模拟 multiprocessing.Pool.map()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56329799/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com