gpt4 book ai didi

Tornado TCP 服务器/客户端进程通信

转载 作者:行者123 更新时间:2023-12-03 09:12:59 26 4
gpt4 key购买 nike

我想在多个 Tornado 进程之间建立通信,每个进程都充当网络服务器,即使用tornado.web.RequestHandler。我的想法是,我想要进程之间有一个完全网状的网络。我有 4 个进程,我想使用 tornado.tcpservertornado.tcpclient 在它们之间建立持续的永久通信:

T1---T2
| \ /|
| \/ |
| / \ |
T3---T4

我是 TCP 编程新手,但是在我在 Tornado 文档中看到的示例中:http://www.tornadoweb.org/en/stable/iostream.htmltornado.iostream.IOStream类的实现下,一旦建立了套接字,所有通信都会完成,然后套接字就会关闭。该示例通过具有回调的 block 来驱动代码,每个回调执行其通信职责。

但是,是否可以打开 TCP 连接并让 BaseIOStream.read_until_close() 空闲并仅在客户端写入服务器时调用?

换句话说,客户端和服务器保持连接,当客户端写入服务器时,它会以某种方式中断Tornado IOLoop以调用read()?

或者我的想法被误导了,这样做的方法是每次我需要进程进行通信时,我都会建立一个新的 TCP 连接,完成工作,然后终止连接?似乎每次建立这个新连接都会包含大量开销,而不是保持连接打开......

最佳答案

这是一个基本的实现。 (我不能保证它的生产质量!)将其保存到文件中并执行类似的操作,每个操作都在不同的终端窗口中:

> python myscript.py 10001 10002 10003
> python myscript.py 10002 10003 10001
> python myscript.py 10003 10001 10002

第一个参数是监听端口,其余参数是其他服务器的端口。

import argparse
import logging
import os
import random
import socket
import struct

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, StreamClosedError
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer
from tornado.options import options as tornado_options


parser = argparse.ArgumentParser()
parser.add_argument("port", type=int, help="port to listen on")
parser.add_argument("peers", type=int, nargs="+", help="peers' ports")
opts = parser.parse_args()

# This is just to configure Tornado logging.
tornado_options.parse_command_line()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)

# Cache this struct definition; important optimization.
int_struct = struct.Struct("<i")
_UNPACK_INT = int_struct.unpack
_PACK_INT = int_struct.pack

tcp_client = TCPClient()


@gen.coroutine
def client(port):
while True:
try:
stream = yield tcp_client.connect('localhost', port)
logging.info("Connected to %d", port)

# Set TCP_NODELAY / disable Nagle's Algorithm.
stream.set_nodelay(True)

while True:
msg = ("Hello from port %d" % opts.port).encode()
length = _PACK_INT(len(msg))
yield stream.write(length + msg)
yield gen.sleep(random.random() * 10)

except StreamClosedError as exc:
logger.error("Error connecting to %d: %s", port, exc)
yield gen.sleep(5)


loop = IOLoop.current()

for peer in opts.peers:
loop.spawn_callback(client, peer)


class MyServer(TCPServer):
@gen.coroutine
def handle_stream(self, stream, address):
logging.info("Connection from peer")
try:
while True:
# Read 4 bytes.
header = yield stream.read_bytes(4)

# Convert from network order to int.
length = _UNPACK_INT(header)[0]

msg = yield stream.read_bytes(length)
logger.info('"%s"' % msg.decode())

del msg # Dereference msg in case it's big.

except StreamClosedError:
logger.error("%s disconnected", address)


server = MyServer()
server.listen(opts.port)

loop.start()

请注意,我们不会调用 read_until_close,因此我们需要某种方法来知道消息何时被完全接收。我在每条消息的开头使用一个 32 位整数来执行此操作,该整数对消息其余部分的长度进行编码。

您问,“当客户端写入服务器时,它会以某种方式中断 Tornado IOLoop 来调用 read()?”这就是 Tornado 的 IOLoop 的用途,也是我们所说的“异步”的意思:许多 Tornado 协程或回调可以等待网络事件,并且 IOLoop 在它们等待的事件发生时唤醒它们。这就是您在上面的代码中看到“yield”的地方所发生的情况。

关于Tornado TCP 服务器/客户端进程通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40239901/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com