gpt4 book ai didi

python - 在python中合并多个阻塞生成器函数

转载 作者:行者123 更新时间:2023-11-28 22:44:49 25 4
gpt4 key购买 nike

我有两个迭代器。每个代表来自阻塞资源(如套接字)的可能无限的数据流。

我想合并两个迭代器中的数据,按照它到达的顺序——即非确定性的。更详细地说,如果我有迭代器 iter1iter2,我希望我的结果是一个等同于 merged 的迭代器。

iter1 : 1 2 3     4   5 ...
iter2 : 1 2 3 ...
merged: 1 2 3 1 2 4 3 5 ...

--- > increasing time --->

我假设我需要一个并发程序,但我不确定是否有 pythonic 方式来执行此操作。我非常希望找到适用于 Python 2.6 的答案。

例如,假设我有两个迭代器,它们“在引擎盖下”从套接字读取数据。这是一个快速的服务器“监听器”,它反复回显客户端连接的日期/时间:

==> message.sh <==
#!/usr/bin/env bash
set -e;

# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
echo $MSG;
sleep 1;
done

==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"

您可以使用 ./server.sh 运行服务器。

下面是一个示例 python 脚本,它试图合并来自两个套接字的消息。然而,这是不正确的——它必须从每个迭代器接收一个值才能继续。使用上面的示例,“合并”结果将是:

iter1 : 1 2 3     4   5 ...
iter2 : 1 2 3 ...
merged: 1 1 2 2 3 3 4 ...

这是脚本:

#!/usr/bin/env python2
import socket
import time

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
while True:
yield sock.recv(1024)


def merge(xs, ys):
iters = [xs, ys]
while True:
for it in iters:
try:
i = it.next()
yield i
except StopIteration:
pass

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
print msg,

最后:我从一个库中获取了迭代器,所以为了这个问题的目的,请假设我必须处理迭代器,并且我不能做一些事情,比如将套接字设置为非阻塞和轮询。

最佳答案

您可以将套接字迭代移动到后台线程中,然后使用Queue 将每个接收到的数据发送到您的主线程。然后你的主线程就可以在队列中使用数据:

import socket
import time
from Queue import Queue
from threading import Thread

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
while True:
data = sock.recv(1024)
yield data
if not data: # End of the stream
return

def consume(q, s):
for i in s:
q.put(i)

def merge(xs, ys):
q = Queue()
iters = [xs, ys]
for it in iters:
t = Thread(target=consume, args=(q, it))
t.start()

done = 0
while True:
out = q.get()
if out == '': # End of the stream.
done += 1
if done == len(iters): # When all iters are done, break out.
return
else:
yield out

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
print msg,

关于python - 在python中合并多个阻塞生成器函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29130833/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com