gpt4 book ai didi

python - I/O 密集型串行端口应用程序 : Porting from Threading, 基于队列的异步设计(ala Twisted)

转载 作者:太空狗 更新时间:2023-10-30 02:23:18 25 4
gpt4 key购买 nike

因此,我一直在为通过串行 (RS-232)“Master”与无线设备通信的客户端开发一个应用程序。我目前已经使用线程编写了应用程序的核心(如下)。我一直在 #python 上注意到,共识似乎是使用线程和使用 Twisted 的异步通信能力。

我还没有找到任何使用 twisted 进行串行端口异步 I/O 通信的好例子。但是,我找到了Dave Peticolas' 'Twisted Introduction' (感谢 nosklo)我目前正在研究,但是,它使用套接字而不是串行通信(但是异步概念肯定得到了很好的解释)。

我将如何使用线程、队列将此应用程序移植到 Twisted?有什么优点/缺点吗(我注意到,有时,如果一个线程挂起,它会导致系统 BSOD)?

代码 (msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
"""
Connects to the serial port and polls nodes for data.
Reads response from node(s) and loads that data into queue.
Parses qdata and writes that data to database.
"""

def __init__(self,
port,
baudrate,
parity,
rtscts,
xonxoff,
echo=False):
try:
self.serial = serial.serial_for_url(port,
baudrate,
parity=parity,
rtscts=rtscts,
xonxoff=xonxoff,
timeout=.01)
except AttributeError:
self.serial = serial.Serial(port,
baudrate,
parity=parity,
rtscts=rtscts,
xonxoff=xonxoff,
timeout=.01)
self.com_data_q = None
self.com_error_q = None
self.livefeed = LiveDataFeed()
self.timer = time.time()
self.dtr_state = True
self.rts_state = True
self.break_state = False

def start(self):
self.data_q = Queue.Queue()
self.error_q = Queue.Queue()
com_error = get_item_from_queue(self.error_q)
if com_error is not None:
print 'Error %s' % (com_error)
self.timer = time.time()
self.alive = True

# start monitor thread
#
self.mon_thread = threading.Thread(target=self.reader)
self.mon_thread.setDaemon(1)
self.mon_thread.start()

# start sending thread
#
self.trans_thread = threading.Thread(target=self.writer)
self.trans_thread.setDaemon(1)
self.trans_thread.start()

def stop(self):
try:
self.alive = False
self.serial.close()
except (KeyboardInterrupt, SystemExit):
self.alive = False

def reader(self):
"""
Reads data from the serial port using self.mon_thread.
Displays that data on the screen.
"""
from rmsg_format import message_crc, message_format
while self.alive:
try:
while self.serial.inWaiting() != 0:

# Read node data from the serial port. Data should be 96B.

data = self.serial.read(96)
data += self.serial.read(self.serial.inWaiting())

if len(data) > 0:

# Put data in to the data_q object
self.data_q.put(data)
if len(data) == 96:
msg = self.data_q.get()

pw = ProtocolWrapper(
header=PROTOCOL_HEADER,
footer=PROTOCOL_FOOTER,
dle=PROTOCOL_DLE)
status = map(pw.input, msg)

if status[-1] == ProtocolStatus.IN_MSG:
# Feed all the bytes of 'msg' sequentially into pw.input

# Parse the received CRC into a 16-bit integer
rec_crc = message_crc.parse(msg[-4:]).crc

# Compute the CRC on the message
calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
from datetime import datetime
ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
if rec_crc != calc_crc:
print ts
print 'ERROR: CRC Mismatch'
print msg.encode('hex')
else:
#msg = message_format.parse(msg[1:])
#print msg.encode('hex') + "\r\n"
msg = message_format.parse(msg[1:])
print msg
#return msg
gc.collect()
time.sleep(.2)
except (KeyboardInterrupt, SystemExit, Exception, TypeError):
self.alive = False
self.serial.close()
raise

def writer(self):
"""
Builds the packet to poll each node for data.
Writes that data to the serial port using self.trans_thread
"""
import time
try:
while self.alive:
try:
dest_module_code = ['DRILLRIG',
'POWERPLANT',
'GENSET',
'MUDPUMP']
dest_ser_no = lambda x: x + 1
for code in dest_module_code:
if code != 'POWERPLANT':
msg = build_message_to_send(
data_len=0x10,
dest_module_code='%s' % (code),
dest_ser_no=dest_ser_no(0),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
elif code == 'POWERPLANT':
msg = build_message_to_send(
data_len=0x10,
dest_module_code='POWERPLANT',
dest_ser_no=dest_ser_no(0),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
msg = build_message_to_send(
data_len=0x10,
dest_module_code='POWERPLANT',
dest_ser_no=dest_ser_no(1),
dest_customer_code='*****',
ret_ser_no=0x01,
ret_module_code='DOGHOUSE',
ret_customer_code='*****',
command='POLL_NODE',
data=[])
self.serial.write(msg)
time.sleep(.2)
gc.collect()
except (KeyboardInterrupt, SystemExit):
self.alive = False
self.serial.close()
raise
except (KeyboardInterrupt, SystemExit):
self.alive = False
self.serial.close()
raise


def main():
poller = Poller(
port='COM4',
baudrate=115200,
parity=serial.PARITY_NONE,
rtscts=0,
xonxoff=0,
)
poller.start()
poller.reader()
poller.writer()
poller.stop()
if __name__ == '__main__':
main()

最佳答案

在线程/队列方法和使用扭曲的方法之间编写一个直接的一对一映射程序是非常困难的(如果不是不可能的话)。

我建议,掌握扭曲及其 react 器方式,它使用协议(protocol)和协议(protocol)特定方法。把它想象成当你使用延迟使用扭曲时,你已经使用线程和队列显式编码的所有异步事物都免费提供给你。

twisted 似乎确实在使用 SerialPort 传输类的 react 器上支持 SerialPort,基本结构似乎有点像这样。

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run()

在 YourProtocolClass() 中,您将处理特定于您的串行端口通信要求的各种事件。 doc/core/examples 目录包含 gpsfix.py 和 mouse.py 等示例。

关于python - I/O 密集型串行端口应用程序 : Porting from Threading, 基于队列的异步设计(ala Twisted),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4726391/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com