gpt4 book ai didi

python - 如何使用并发.futures.ThreadPoolExecutor 或 multiprocessing.pool.ThreadPool 将某些变量绑定(bind)到线程?

转载 作者:行者123 更新时间:2023-11-30 22:31:06 26 4
gpt4 key购买 nike

我想做的是这样的:

class MyThread(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)

def run(self):
for i in range(3):
print("current id: {}".format(self._id))

def main():
ts = []
for i in range(5):
t = MyThread("localhost", 3001)
t.start()
ts.append(t)

for t in ts:
t.join()

我得到了这些输出:

current id: 10
current id: 10
current id: 13
current id: 43
current id: 13
current id: 10
current id: 83
current id: 83
current id: 83
current id: 13
current id: 98
current id: 43
current id: 98
current id: 43
current id: 98

这个输出就是我想要的。正如你所看到的,我的_id在不同线程中是不同的,但在单线程中,我共享相同的_id 。(_id只是这些变量之一,我还有很多其他类似的变量)。

现在,我想对 multiprocessing.pool.ThreadPool 做同样的事情

class MyProcessor():
def __init__(self, host, port):
# self._sock = self.initsocket(host, port)
self._id = random.randint(0, 100)

def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i

def main():
with ThreadPool(5) as p:
p.map(MyProcessor("localhost", 3001), range(15))

但是现在_id将被所有线程共享:

current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58
current id: 58

对于concurrent.futures.ThreadPoolExecutor,我也尝试做同样的事情:

class MyProcessor():
def __init__(self, host, port):
# self.initsocket(host, port)
self._id = random.randint(0, 100)

def __call__(self, i):
print("current id: {}".format(self._id))
return self._id * i

def main():
with ThreadPoolExecutor(max_workers=5) as executor:
func = MyProcessor("localhost", 3001)
futures = [executor.submit(func, i) for i in range(15)]
for f in as_completed(futures):
pass

输出是这样的:

current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94
current id: 94

当然,我得到这个结果并不奇怪,因为我只调用了一次__init__。但我要问的是:

如何使用 concurrent.futures.ThreadPoolExecutormultiprocessing.pool.ThreadPool 做同样的事情(并且也请不要再使用全局变量)。

最佳答案

这里出现了几个问题,我会尽力解决所有这些问题。

在您给出的第一个示例中,您可以完全控制您创建的所有Thread,因此每个线程在初始值设定项中都会获得一个唯一的 ID。当然,问题在于您一次启动所有线程,这对于大量线程来说可能非常低效。

在问题中的两个线程池示例中,您都为可调用对象初始化了一次 ID,因此每个线程当然没有单独的 ID。正确的方法是在 __call__ 方法中初始化每个线程的 ID:

class MyProcessor():    def __init__(self, host, port):        self.initsocket(host, port)    def __call__(self, i):        id_ = random.randint(0, 100)        print("current id: {}".format(id_))        return id_ * idef main():    func = MyProcessor("localhost", 3001)    with ThreadPoolExecutor(max_workers=5) as executor:        collections.deque(executor.map(MyProcessor, range(15)), maxlen=0)

Notice that you can shorten the concurrent.futures.ThreadPoolExecutor example by using the map method there as well, if all you care about is the final result and not the intermediate Future objects. The deque(..., maxlen=0) call is a standard idiom for consuming an iterator.

Given the gist you linked to in your comments, I understand why you want to have thread-local data. However, you certainly do not need a global variable to achieve that result. Here are a couple of alternatives:

  1. Just add your thread-local data to self in the initializer, and voila, it is accessible to all calls without being global:

    def __init__(self, host, port):
    self.thread_local = threading.local()

    def __call__(self, i):
    try:
    id_ = self.thread_local.id_
    except AttributeError:
    id_ = random.randint(0, 100)
    ...
  2. 使用函数本地数据而不是线程本地数据。您正在使用线程本地数据来避免将连接(要点中)传递给某些私有(private)函数。这不是真正的需要,只是一种审美选择。您始终可以使用 def _send_data(self, conn, **kwargs)def _recv_data(self, conn),因为连接实际来自的唯一位置是 >__call__ 无论如何。

虽然在某些情况下可能会出现选项#1,但我强烈建议您不要将其与任何类型的线程池管理器一起使用。线程池可以重用相同的线程来按顺序运行任务提交到的队列中的任务。这意味着您最终将在本应打开自己的任务中获得相同的连接。在您最初的示例中,您独立创建所有线程,这本来就很好,但是当您在回收池线程上多次调用 MyProcessor 时,它可能就不好了。

关于python - 如何使用并发.futures.ThreadPoolExecutor 或 multiprocessing.pool.ThreadPool 将某些变量绑定(bind)到线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45901594/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com