gpt4 book ai didi

python - pty多路复用器

转载 作者:IT王子 更新时间:2023-10-29 00:51:49 27 4
gpt4 key购买 nike

我正在尝试对 Linux 上的串行端口进行多路复用访问。我正在使用只有一个串行端口的嵌入式系统,如果有多个进程与之通信会很好。

常见的用例是:

  • 一个运行测试的主程序(发送命令和接收输出);
  • 另一个记录所有串行端口事件;
  • 打开用户终端以发送其他命令和/或在测试过程中出现错误后执行事后分析。

首先,我制作了一个简单的 python 脚本来打开 n 个伪终端对(加上串行端口)并使用 poll 语句将输入/输出定向到正确的位置:

# Removed boiler plate and error checking for clarity

##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking

##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()
# Print slave names so others know where to connect
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
pts.append(master)

##### Poller setup
poller = select.poll()
poller.register(ttyS.fd, select.POLLIN | select.POLLPRI)
for pt in pts:
poller.register(pt, select.POLLIN | select.POLLPRI)

##### MAIN
while True:

events = poller.poll(500)

for fd, flag in events:

# fd has input
if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
data = ttyS.read(80)
for pt in pts:
os.write(pt, data)

# Data on other pty
else:
ttyS.write(os.read(fd, 80))

如果每个 pty 都已连接,则此方法非常有效。如果有一些未连接的 pty,最终它的缓冲区会填满并在写入时阻塞。似乎我需要知道哪些奴隶已连接或某种按需 pty 开放。

我在 this question 上发现了一个巧妙的技巧,这家伙只需要从串口部分读取数据,所以我修改了我的脚本:

##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking

##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()

# slaves
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
os.close(slave) # POLLHUP trick

# masters
pts.append(master)

##### Poller setup
reader = select.poll()
writer = select.poll()

reader.register(ttyS, select.POLLIN | select.POLLPRI)
for pt in pts:
reader.register(pt, select.POLLIN | select.POLLPRI)
writer.register(pt, select.POLLIN | select.POLLPRI | select.POLLOUT)

def write_to_ptys(data):
events = writer.poll(500)

for fd, flag in events:

# There is someone on the other side...
if not (flag & select.POLLHUP):
os.write(fd, data)

##### MAIN
while True:
events = reader.poll(500)

for fd, flag in events:

if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
write_to_tty(ttyS.read(80))
# Data on other pty
else:
ttyS.write(os.read(fd, 80))

这有效,但使用了 100% 的 CPU,因为读者轮询充满了 POLLHUP 事件。

我想如果我使用 TCP 套接字而不是伪终端,我可以得到我想要的东西。缺点是我必须修改所有其他已经与终端一起使用的脚本才能使用套接字(我知道我可以使用 socat,我只是想要更简单的东西)。此外,还有所有网络开销...

那么,有什么想法吗?

我不介意使用其他工具,只要设置简单即可。我也不介意使用其他语言,我就是最喜欢 Python。

最佳答案

最后我写了一个简单的 TCP 服务器,就像我说的我不想...虽然它工作得很好。它使用与问题代码相同的通用架构,但使用 TCP 套接字而不是伪终端。

我发了here以防万一有人想使用它。

调用它:

md:mux_serial> ./mux_server.py --device /dev/ttyS0 --baud 115200 --port 23200
MUX > Serial port: /dev/ttyS0 @ 115200
MUX > Server: localhost:23200

我在另一个终端上使用socat直接访问端口...

md:~> socat -,raw,echo=0,escape=0x0f TCP4:localhost:23200

...或者创建一个伪终端来使用需要这些的内部脚本:

md:~> socat -d -d pty,raw,echo=0 TCP4:localhost:23200
2012/10/01 13:08:21 socat[3798] N PTY is /dev/pts/4
2012/10/01 13:08:21 socat[3798] N opening connection to AF=2 127.0.0.1:23200
2012/10/01 13:08:21 socat[3798] N successfully connected from local address AF=2 127.0.0.1:35138
2012/10/01 13:08:21 socat[3798] N starting data transfer loop with FDs [3,3] and [5,5]

关于python - pty多路复用器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12518559/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com