gpt4 book ai didi

ZeroMQ 和本地 FIFO

转载 作者:行者123 更新时间:2023-12-05 00:31:15 27 4
gpt4 key购买 nike

我有两个进程(“发送者”和“接收者”)需要通过 transient 单向 FIFO 通信管道在单台机器上本地进行通信。这是我想要发生的事情(使用更接近 Unix 域套接字的语言):

  • 发送者在已知地址“创建”管道,并立即将消息发送到
  • 在某个时刻(在发送方“创建”管道之前或之后),接收方连接到管道
  • 阅读器从管道中读取消息
  • 发件人“关闭”管道
  • 读者注意到所有消息都已阅读(可能管道已关闭)

  • 我的问题是:我如何用 ZeroMQ 实现这个? “PUB/SUB”,“推/拉”?在 ZMQ 套接字中检测“数据结束”的机制是什么?是否可以同时允许上述前两项的排序:即发送方或接收方是否先尝试连接?如果是这样,怎么做?

    谢谢。

    最佳答案

    关于 zeromq 的一些知识:

  • 绑定(bind)/连接顺序通常不重要
  • 当一个对等方应接收每条消息和/或不应丢弃消息时使用推/拉
  • PUB/SUB 用于所有对等方都应接收消息和/或应丢弃在无人收听时发送的消息。
  • ZeroMQ 故意从应用程序代码中隐藏连接/断开打开/关闭事件,因此您无法检测到实际的关闭事件。

  • 你需要知道的一件事,你不应该知道:当一个套接字连接时,它会创建一个管道(对等点不需要存在)。当一个套接字绑定(bind)时,它只在对等点连接时创建管道。这些管道控制套接字的 HWM 行为。这意味着没有对等点的连接套接字和没有对等点的绑定(bind)套接字的行为是不同的。如果您尝试使用它发送消息,则没有对等点的绑定(bind)套接字将阻塞,而连接套接字将愉快地将消息排队在内存中,直到对等点到达并开始消费消息。

    基于这些点,你想要做的是:
  • 使用推/拉
  • 接收方应绑定(bind)
  • 发送一条特殊的“关闭”消息,指示队列已完成,而不是检测 tcp/ipc 级别的关闭事件。

  • 这是 Python 中的一个工作示例,它使用 IPC 套接字(文件)进行通信,其中接收方在发送方之后的某个时间开始。

    双方需要知道的共同信息:
    import time

    import zmq

    # the file used for IPC communication
    PIPE = '/tmp/fifo-pipe'

    # command flags for our tiny message protocol
    DONE = b'\x00'
    MSG = b'\x01'

    接收者 (PULL) 绑定(bind),并消耗直到 DONE
    def receiver():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PULL)
    s.bind("ipc://%s" % PIPE)
    while True:
    parts = s.recv_multipart()
    cmd = parts[0]
    if cmd == DONE:
    print "[R] received DONE"
    break
    msg = parts[1]
    # handle the message
    print "[R] %.1f consuming %s" % (time.time() - t0, msg)
    s.close()
    ctx.term()
    print "[R] done"

    发送方(PUSH)连接并发送,发送 DONE 表示完成
    def sender():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUSH)
    s.connect("ipc://%s" % PIPE)

    for i in range(10):
    msg = b'msg %i' % i
    print "[S] %.1f sending %s" % (time.time() - t0, msg)
    s.send_multipart([MSG, msg])
    time.sleep(1)
    print "[S] sending DONE"
    s.send(DONE)
    s.close()
    ctx.term()
    print "[S] done"

    和一个演示脚本一起运行它们,发送者首先启动,接收者在发送者已经发送了几条消息之后启动:
    from threading import Thread

    # global t0, just for keeping times relative to start, rather than 1970
    t0 = time.time()

    # start the sender
    s = Thread(target=sender)
    s.start()

    # start the receiver after a delay
    time.sleep(5)
    r = Thread(target=receiver)
    r.start()

    # wait for them both to finish
    s.join()
    r.join()

    可以看到一起运行 here .

    关于ZeroMQ 和本地 FIFO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15166039/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com