gpt4 book ai didi

c# - 异步处理消息时的BasicAck

转载 作者:行者123 更新时间:2023-11-30 12:57:03 25 4
gpt4 key购买 nike

我正在尝试设置一个 RabbitMQ 消息队列,以便我可以发送一条消息来启动一个长时间运行的进程,并且还能够在需要时发送一条消息来取消该长时间运行的进程。所以我从 EventingBasicConsumer 开始,并在我的 Recieved 处理程序中做了类似的事情:

if (startProcess) 
{
// start a long running process
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);

这行不通,因为 EventingBasicConsumer 不是多线程的,一次只能处理一条消息。因此它无法处理取消消息,直到它完成长时间运行的进程(此时,显然没有意义)。所以接下来我尝试了这个:

if (startProcess) 
{
Task.Run(() => {
// start a long running process
}
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);

这行得通。我现在可以取消长时间运行的过程...但是,我确认请求立即运行长时间运行的过程,而不是在它完成之后。这意味着如果长时间运行的进程崩溃,则消息已被删除。因此,这将需要原始发件人保持跟踪,并让接收者必须发回消息说已经完成,这一切都会变得有点复杂。

所以我想也许我可以将 EventingBasicConsumer 更改为始终在新线程上触发其 Received 事件。所以我创建了这样的东西:

public class AsyncRabbitConsumer : DefaultBasicConsumer
{
// code all the same as EventingBasicConsumer except this bit:
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
if (Received != null)
{
var args = new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);

Task.Run(() =>
{
Received(this, args);
});
}
}
}

现在在我的第一段代码中,我可以让它在长时间运行的进程仍在运行时处理取消消息并且长期运行的进程不会确认并删除它的消息,直到它实际上完成(或取消)。所以那应该很棒......除非我取消我得到这个:

An exception of type 'RabbitMQ.Client.Exceptions.AlreadyClosedException' occurred in RabbitMQ.Client.dll but was not handled in user code

Additional information: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - unknown delivery tag 3", classId=60, methodId=80, cause=

从似乎是启动长时间运行进程的线程的 channel.BasicAck 步骤。那么这是怎么回事?我认为确认(首先是取消消息,然后是长时间运行的进程消息)在这里被越过了。有什么好的方法可以解决这个问题吗?还是我找错了树?

可能值得注意的是,取消长时间运行的进程不是即时的。它将在下一个方便的点取消,因此几乎可以肯定取消消息将在长时间运行的进程结束之前完成处理。

最佳答案

你可以做的是有一些类似消费者对的东西——第一个是长期运行的进程,第二个是一个代理来终止长期运行的进程。第一个会接收消息,处理它并在完成处理后进行 ACK,如果检测到 kill 信号也会进行 ACK。对中的代理显然会收到取消消息并杀死第一个,并且还会生成第一个的另一个实例。显然,这需要进程(消费者)在 RMQ 之外进行通信。

想到的另一件事(但我从来没有尝试过这样的事情)是您在消费者中将 prefetch count 设置为 2,并在“处理单个数据消息”时发布发送给代理的第二条消息(转发),除非它是 CANCEL 消息,在这种情况下,您在中止处理后确认它们 - CANCEL 和 DATA(这样调用它)消息。

另一种选择可能是在“长时间运行的进程”中,您有两个消费者线程,每个线程都使用自己的 channel 。

关于c# - 异步处理消息时的BasicAck,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36798145/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com