gpt4 book ai didi

go - 如何让 ZeroMQ 使出站排队但未在设定时间内发送的消息超时?

转载 作者:IT王子 更新时间:2023-10-29 01:44:43 31 4
gpt4 key购买 nike

我有一个在服务器上运行的应用程序,该应用程序从电话应用程序接收请求,然后在工作服务器之间对请求进行负载平衡。我正在尝试添加超时,以防主服务器上已在出站队列中超时长度的消息从队列中删除。更具体地说,主服务器上的应用程序是用 golang 编写的,并实现了 Paranoid Pirate Pattern。的负载平衡。我目前的代码是:

import (
"fmt"
zmq "github.com/pebbe/zmq4"
"time"
)

const (
HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1500 * time.Millisecond

MESSAGE_READY = "\001"
MESSAGE_HEARTBEAT = "\002"
)

var (
client *zmq.Socket
backend *zmq.Socket
frontend *zmq.Socket
workerPoller *zmq.Poller
brokerPoller *zmq.Poller
workerQueue []Worker
)

type Worker struct {
Id string
Expire time.Time
}

type RequestWrapper {
RequestToSend Request

}

func NewWorker(id string) Worker {
return Worker{
Id: id,
Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
}
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
fmt.Println(worker.Id, " joined")
for i, w := range workers {
if worker.Id == w.Id {
if i == 0 {
workers = workers[1:]
} else if i == len(workers)-1 {
workers = workers[:i]
} else {
workers = append(workers[:i], workers[i+1:]...)
}
break
}
}
return append(workers, worker)
}

func PurgeInactiveWorkers() {
now := time.Now()
for i, worker := range workerQueue {
if now.Before(worker.Expire) {
workerQueue = workerQueue[i:]
return
}
}

workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
heartbeat := time.Tick(HEARTBEAT_INTERVAL)
for {
var sockets []zmq.Polled

// If you have available workers, poll on the both front and backend
// If not poll on backend with infinite timeout
if len(workerQueue) > 0 {
sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
} else {
sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
}

for _, socket := range sockets {
switch socket.Socket {
// backend is a router
case backend:
workerId, _ := backend.Recv(0)
workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
clientId, _ := backend.Recv(0)
if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
route, _ := backend.Recv(0)
message, _ := backend.RecvBytes(0)

fmt.Println("Received response")
RouteResponse(route, message)

// frontend.Send(clientId, zmq.SNDMORE)
// frontend.Send("", zmq.SNDMORE)
// frontend.SendBytes(message, 0)
}
// frontend is a dealer
case frontend:
clientId, _ := frontend.Recv(0)
route, _ := frontend.Recv(0)
message, _ := frontend.RecvBytes(0)

backend.Send(workerQueue[0].Id, zmq.SNDMORE)
backend.Send(clientId, zmq.SNDMORE)
backend.Send(route, zmq.SNDMORE)
backend.SendBytes(message, 0)

workerQueue = workerQueue[1:]
}
}

select {
case <-heartbeat:
for _, worker := range workerQueue {
backend.Send(worker.Id, zmq.SNDMORE)
backend.Send(MESSAGE_HEARTBEAT, 0)
}
break
default:
}

PurgeInactiveWorkers()
}
}

如果后端发送了一条消息,但它在一段时间内没有真正发送给工作人员,我希望它过期并且永远不会发送。 是否有可以实现此目的的套接字选项?如果没有,我需要做什么才能实现这一点?

我认为我可以在没有套接字选项的情况下执行此操作的两种方法是:

1) 让后端将消息包装在包装器中并发送到 golang 队列,而不是通过 zeromq。包装器包含消息“发送”的时间。后端同时从golang队列前端拉取一条消息,检查消息是否过期。如果是,请不要发送,如果不是,请发送消息。我可以让后端先将消息添加到 golang 队列,然后在同一代码块中真正将其发送出去。这样,我就不需要锁了。

2) 通过 zeromq 将包装器消息发送到检索器,检索器检查其是否过期并提前返回。我不喜欢这样,因为它似乎不利于性能。

最佳答案

您要做的是将通信用作执行集合点。发送方想知道接收方何时收到消息。

ZMQ实现了Actor模型。您需要修改通信顺序进程模型(发送超时的模型)。基本上你需要添加控制消息流到/从工作人员,这个想法是服务器要求工作人员接收消息并且服务器等待回复。回复意味着工作人员已准备好立即接收消息,并且服务器和工作人员都已在其程序流中的发送/接收处会合。如果该回复未能在超时秒内到达,则服务器不会发送实际消息。

或者您可以通过将所有内容都交给工作人员来作弊,包裹在带有“在时间 X 发送”字段的消息中,并让工作人员决定丢弃旧消息。

关于go - 如何让 ZeroMQ 使出站排队但未在设定时间内发送的消息超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44687426/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com