gpt4 book ai didi

c# - RabbitMQ 只处理 50 条消息然后阻塞

转载 作者:太空宇宙 更新时间:2023-11-03 15:49:05 29 4
gpt4 key购买 nike

我在 .net 中使用 RabbitMQ,当我在队列中放置 100 条消息时,我看到了一个奇怪的问题。它处理大约 50 条消息,然后 Dequeue() 方法挂起。如果我重新启动该服务,它会处理剩余的项目。

编辑:它正在处理恰好 50% 的队列。当我添加 1000 条消息时,它只处理 500 条。即使是单线程

我在这里错过了什么?

    private void InitializeAgent() {
var agentFactory = new ConnectionFactory() { HostName = "localhost" };
agentConnection = agentFactory.CreateConnection();
agentChannel = agentConnection.CreateModel();
var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null);
consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(GetType().Name, false, consumer);
}

public void DequeueMessages() {
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}

public void AgentTask() {
var instance = factory.GetInstance(threadItem);

while (true)
DequeueMessages();
}

private void ProcessWorkInThread(object state) {
var ea = state as BasicDeliverEventArgs;

var message = Encoding.UTF8.GetString(ea.Body);

var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public };
var item = JsonConvert.DeserializeObject<TEntity>(message, settings);

Thread.Sleep(10000) //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}

最佳答案

这肯定是.NET客户端版本问题,使用3.4.0,下面的代码可以正常运行。

static readonly ConnectionFactory Factory = new ConnectionFactory { HostName = "localhost" };
static readonly IConnection Connection = Factory.CreateConnection();
static QueueingBasicConsumer consumer;
static IModel agentChannel;

static CancellationTokenSource _tokenSource;

static void Main(string[] args)
{
_tokenSource = new CancellationTokenSource();

const string queueName = "testQueue";
agentChannel = Connection.CreateModel();
agentChannel.QueueDeclare(queueName, true, false, false, null);
agentChannel.QueueBind(queueName, "testExchange", "");

consumer = new QueueingBasicConsumer(agentChannel);
agentChannel.BasicConsume(queueName, false, consumer);

while (!_tokenSource.Token.IsCancellationRequested)
{
DequeueMessages();
}
Console.ReadLine();
_tokenSource.Cancel();
}

static void DequeueMessages()
{
ThreadPool.SetMaxThreads(200, 200);
ThreadPool.SetMinThreads(200, 200);
var ea = consumer.Queue.Dequeue();
ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea);
}

static void ProcessWorkInThread(object state)
{
var ea = state as BasicDeliverEventArgs;

var message = Encoding.UTF8.GetString(ea.Body);

var settings = new JsonSerializerSettings();
settings.ContractResolver = new DefaultContractResolver()
{
DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public
};
var item = JsonConvert.DeserializeObject<string>(message, settings);
Console.WriteLine(item);
Thread.Sleep(10000); //simulate work
lock (agentChannel)
agentChannel.BasicAck(ea.DeliveryTag, false);
}

关于c# - RabbitMQ 只处理 50 条消息然后阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26676850/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com