- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在尝试创建一个类,它可以运行一个单独的进程来完成一些需要很长时间的工作,从一个主模块启动一堆这些,然后等待它们全部完成。我想启动一次流程,然后继续为它们提供要做的事情,而不是创建和破坏流程。例如,也许我有 10 台服务器运行 dd 命令,然后我希望它们都 scp 文件等。
我的最终目标是为每个系统创建一个类,以跟踪与其关联的系统的信息,例如 IP 地址、日志、运行时等。但是该类必须能够启动系统命令然后在系统命令运行时将执行返回给调用者,以便稍后跟进系统命令的结果。
我的尝试失败了,因为我无法通过管道将类的实例方法通过 pickle 发送到子进程。那些是不可腌制的。因此,我尝试以各种方式修复它,但我无法弄清楚。如何修补我的代码来做到这一点?如果你不能发送任何有用的东西,多处理有什么好处?
是否有关于多处理与类实例一起使用的良好文档?我可以让多处理模块工作的唯一方法是使用简单的功能。每次在类实例中使用它的尝试都失败了。也许我应该改为传递事件?我还不明白该怎么做。
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
编辑:我尝试将 ProcessWorker
类和多处理队列的创建移到 Worker
类之外,然后尝试手动腌制 worker 实例。即使这样也不起作用,我得到一个错误
RuntimeError: Queue objects should only be shared between processes through inheritance
。但我只是将这些队列的引用传递给工作实例?我缺少一些基本的东西。这是主要部分的修改代码:
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
最佳答案
所以,问题在于我假设 Python 正在做某种与 C++/fork() 工作方式不同的魔法。我不知何故认为 Python 只是复制了类,而不是将整个程序复制到一个单独的进程中。我严重浪费了几天试图让它工作,因为所有关于泡菜序列化的讨论让我认为它实际上是通过管道发送所有内容。我知道某些东西不能通过管道发送,但我认为我的问题是我没有正确包装东西。
如果 Python 文档让我从 10,000 英尺的角度了解使用此模块时会发生什么,这一切都是可以避免的。当然,它告诉我多进程模块的方法是做什么的,并给了我一些基本的例子,但我想知道的是幕后的“操作理论”是什么!这是我可以使用的信息。如果我的回答是关闭的,请插话。它将帮助我学习。
当您使用此模块运行启动进程时,整个程序会被复制到另一个进程中。但由于它不是“__main__
”进程并且我的代码正在检查它,它不会无限地启动另一个进程。它只是停下来,坐在外面等着做某事,就像僵尸一样。在调用 multiprocess.Process() 时在父级中初始化的所有内容都已设置好并准备就绪。一旦您将某些内容放入 multiprocess.Queue 或共享内存或管道等(无论您正在通信),那么单独的进程就会接收它并开始工作。它可以利用所有导入的模块并进行设置,就好像它是父模块一样。但是,一旦父进程或单独进程中的某些内部状态变量发生更改,这些更改就会被隔离。一旦进程产生,现在你的工作就是在必要时通过队列、管道、共享内存等保持它们同步。
我扔掉了代码并重新开始,但现在我只在 ProcessWorker
中添加了一个额外的函数,这是一种运行命令行的“执行”方法。很简单。我不必担心以这种方式启动然后关闭一堆进程,这给我带来了过去在 C++ 中的各种不稳定和性能问题。当我切换到一开始启动进程,然后将消息传递给那些等待的进程时,我的性能提高了,而且非常稳定。
顺便说一句,我查看了这个链接以获得帮助,这让我很失望,因为这个例子让我认为方法正在跨队列传输:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html第一部分的第二个示例使用“next_task()”,它(对我来说)似乎正在执行通过队列接收的任务。
关于python - 如何在 Python 中对类实例使用多处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14169550/
对于 Metal ,如果对主纹理进行 mipmap 处理,是否还需要对多采样纹理进行 mipmap 处理?我阅读了苹果文档,但没有得到任何相关信息。 最佳答案 Mipmapping 适用于您将从中
我正在使用的代码在后端 Groovy 代码中具有呈现 GSP(Groovy 服务器页面)的 Controller 。对于前端,我们使用 React-router v4 来处理路由。我遇到的问题是,通过
我们正在 build 一个巨大的网站。我们正在考虑是在服务器端(ASP .Net)还是在客户端进行 HTML 处理。 例如,我们有 HTML 文件,其作用类似于用于生成选项卡的模板。服务器端获取 HT
我正在尝试将图像加载到 void setup() 中的数组中,但是当我这样做时出现此错误:“类型不匹配,'processing .core.PImage' does not匹配“processing.
我正在尝试使用其私有(private)应用程序更新 Shopify 上的客户标签。我用 postman 尝试过,一切正常,但通过 AJAX,它带我成功回调而不是错误,但成功后我得到了身份验证链接,而不
如何更改我的 Processing appIconTest.exe 导出的默认图标在窗口中的应用程序? 默认一个: 最佳答案 经过一些研究,我能找到的最简单的解决方案是: 进入 ...\process
我在 Processing 中做了一个简单的小游戏,但需要一些帮助。我有一个 mp3,想将它添加到我的应用程序中,以便在后台循环运行。 这可能吗?非常感谢。 最佳答案 您可以使用声音库。处理已经自带
我有几个这样创建的按钮: 在 setup() PImage[] imgs1 = {loadImage("AREA1_1.png"),loadImage("AREA1_2.png"),loadImage
我正在尝试使用 Processing 创建一个多人游戏,但无法弄清楚如何将屏幕分成两个以显示玩家的不同情况? 就像在 c# 中一样,我们有Viewport leftViewport,rightView
我一直在尝试使用 Moore 邻域在处理过程中创建元胞自动机,到目前为止非常成功。我已经设法使基本系统正常工作,现在我希望通过添加不同的功能来使用它。现在,我检查细胞是否存活。如果是,我使用 fill
有没有办法用 JavaScript 代码检查资源使用情况?我可以检查脚本的 RAM 使用情况和 CPU 使用情况吗? 由于做某事有多种方法,我可能会使用不同的方法编写代码,并将其保存为两个不同的文件,
我想弄清楚如何处理这样的列表: [ [[4,6,7], [1,2,4,6]] , [[10,4,2,4], [1]] ] 这是一个整数列表的列表 我希望我的函数将此列表作为输入并返回列表中没有重复的整
有没有办法在不需要时处理 MethodChannel/EventChannel ?我问是因为我想为对象创建多个方法/事件 channel 。 例子: class Call { ... fields
我有一个关于在 Python3 中处理 ConnectionResetError 的问题。这通常发生在我使用 urllib.request.Request 函数时。我想知道如果我们遇到这样的错误是否可
我一直在努力解决这个问题几个小时,但无济于事。代码很简单,一个弹跳球(粒子)。将粒子的速度初始化为 (0, 0) 将使其保持上下弹跳。将粒子的初始化速度更改为 (0, 0.01) 或任何十进制浮点数都
我把自己弄得一团糟。 我想在我的系统中添加 python3.6 所以我决定在我的 Ubuntu 19.10 中卸载现有的。但是现在每次我想安装一些东西我都会得到这样的错误: dpkg: error w
我正在努力解决 Rpart 包中的 NA 功能。我得到了以下数据框(下面的代码) Outcome VarA VarB 1 1 1 0 2 1 1 1
我将 Java 与 JSF 一起使用,这是 Glassfish 3 容器。 在我的 Web 应用程序中,我试图实现一个文件(图像)管理系统。 我有一个 config.properties我从中读取上传
所以我一直在Processing工作几个星期以来,虽然我没有编程经验,但我已经转向更复杂的项目。我正在编写一个进化模拟器,它会产生具有随机属性的生物。 最终,我将添加复制,但现在这些生物只是在屏幕上漂
有人知道 Delphi 2009 对“with”的处理有什么不同吗? 我昨天解决了一个问题,只是将“with”解构为完整引用,如“with Datamodule、Dataset、MainForm”。
我是一名优秀的程序员,十分优秀!