gpt4 book ai didi

rabbitmq - MassTransit 将消息速率上限限制为 10

转载 作者:行者123 更新时间:2023-12-02 19:24:03 24 4
gpt4 key购买 nike

我设置了一个与 RabbitMQ 配合使用的公共(public)交通消费者服务,但我不知道如何提高消费者的速度 - 它似乎硬性限制为每秒接收 10 条消息。

我已尝试此处列出的步骤:https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ ,但没有成功 - 将预取和并发消费者设置为 25 除了增加已确认的消息之外没有任何作用,但它不会增加下载消息的速率。

我的配置如下:

ServiceBusFactory.ConfigureDefaultSettings(x =>
{
x.SetConcurrentReceiverLimit(25);
x.SetConcurrentConsumerLimit(25);
});

_bus = ServiceBusFactory.New(
sbc =>
{
sbc.UseRabbitMq(x =>
x.ConfigureHost(
"rabbitmq://localhost/Dev/consume?prefetch=25",
y =>
{
y.SetUsername(config.Username);
y.SetPassword(config.Password);
}));
sbc.UseLog4Net();
sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
sbc.Subscribe(x => RegisterConsumers(x, container));
sbc.UseJsonSerializer();
sbc.SetConcurrentConsumerLimit(25);
});

我在两个地方设置并发消费者限制,因为我不确定是否需要在默认设置或总线配置中设置它,并且消费者是通过统一注册的 - 我省略了消费者订阅,因为所有订阅者都在接收。

我有点困惑是否还需要设置其他内容,或者是否需要更改设置配置的顺序。

非常感谢任何帮助。

最佳答案

在与这个问题度过了一个浪漫的夜晚并尝试了克里斯建议的不同方法之后,我发现您还需要做另一件事才能使其正常工作。

具体来说,,您需要在消费者队列地址上设置预取:

sbc.UseRabbitMq(
f =>
f.ConfigureHost(
new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
c =>
{
} )
);

int pf = 20; // prefetch

// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );

但这还不够。

关键可以在克里斯在他的答案下面的评论中提到的mtstress工具的代码中找到。事实证明该工具调用:

int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );

将此添加到我的代码中可以解决该问题。我想知道为什么 MSMQ 传输不需要这样做...

更新#1

经过进一步调查,我发现了可能的罪魁祸首。它位于 ServiceBusBuilderImpl 中。

有一种方法可以提高限制,即ConfigureThreadPool

这里的问题是它调用了CalculateRequiredThreads,它应该返回所需线程的数量。不幸的是,后者在我的客户端 Windows 7 和 Windows Server 上都返回值。因此,ConfigureThreadPool 实际上不执行任何操作,因为在调用 ThreadPool.SetMin/MaxThreads 时,负值将被忽略。

这个负值怎么办?看来 CalculateRequiredThreads 调用 ThreadPool.GetMinThreadsThreadPool.GetAvailableThreads 并使用公式来得出所需线程的数量:

var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);

这里的问题是,在我的机器上,这实际上是:

40 (my limit) + 8 (workerThreads) - 1023 (availableThreads) 

当然会返回

-975

结论是:上面来自大众交通内部的代码似乎是错误的。当我提前手动提高限制时,ConfigureMinThreads 会尊重它(因为它仅在高于读取值时才设置限制)。

如果没有提前手动设置限制,则无法设置限制,因此代码会执行与默认线程池限制一样多的线程(在我的机器上似乎是 8)。

显然有人认为这个公式会产生

40 + 8 - 8

在默认情况下。为什么 GetMinThreadsGetAvailableThreads 返回如此不相关的值尚未确定......

更新#2

改变

    static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}

    static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}

解决了问题。此处均返回 1023,并且公式的输出是正确的预期线程数。

关于rabbitmq - MassTransit 将消息速率上限限制为 10,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23997190/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com