gpt4 book ai didi

c# - MassTransit - PrefetchCount 和单个消费者的多个 channel 的解释

转载 作者:行者123 更新时间:2023-12-04 01:28:03 30 4
gpt4 key购买 nike

我一直在玩 PreFetch,并试图弄清楚为什么 PreFetch 在队列的管理界面上总是设置为 0。在 RabbitMQ 管理界面中,我可以看到 channel 上配置的 Prefetch,但看不到队列本身。我还注意到它们被注册为“全局”而不是“每个消费者”,但对于我的生活,我似乎无法在 MassTransit 中找到改变它的设置,尽管我猜我有一个误解关于这是如何工作的,文档并没有帮助给我一个 ELI5。

这是一个示例配置:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});

cfg.ReceiveEndpoint(
host,
"TEST-QUEUE-PF",
ec =>
{
ec.Consumer<MyConsumer>(context);
ec.PrefetchCount = 50; // consumer specific
ec.UseConcurrencyLimit(1); // consumer specific
});

cfg.PrefetchCount = 100; // bus control specific
cfg.UseConcurrencyLimit(1); // bus control specific
});

这将创建以下队列:

Queue

然后查看 channel ,我看到以下有关预取的信息:

enter image description here

如果我查看所有 channel ,我会看到以下内容:

enter image description here

我正在努力理解这些 PrefetchCounts 中的每一个与什么相关。

作为背景知识,我们有几个运行消费者的多核服务器(即循环,或者更恰本地说是“饥饿的河马”,因为我不关心平等分配)。 PrefetchCount 和 ConcurrencyLimit 的默认设置对我们来说不是很好,因为我们的消费者有很多工作要做,而且数据库服务器重载导致超时。我正在寻找一种方法来配置这些消费者,这样他们就不会那样做。

这是 MassTransit 5.5.5,因为任何超过这一点的东西都会破坏 UseSerilog() 集成,而且我找不到简单的升级路径。 Erlang 和 RabbitMq 本身就是当前版本。这是更详细的 AutoFac 模块:
private class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
builder.Register(context =>
{
var busSettings = context.Resolve<BusSettings>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});

cfg.ReceiveEndpoint(
host,
$"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
ec =>
{
ec.PrefetchCount = 50;
ec.UseConcurrencyLimit(2);
ec.Consumer<MyConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
retryConfig
.Intervals(new[] { 1, 2, 4, 8, 16, 32 }
.Select(t => TimeSpan.FromMinutes(t))
.ToArray());
retryConfig
.Handle<HttpRequestException>();
retryConfig
.Handle<SwaggerException>(ex => ex.IsRetryValid());
});
});

cfg.PrefetchCount = 100;
cfg.UseConcurrencyLimit(2);
cfg.UseSerilog();

var correlationIdProvider = context.Resolve<ICorrelationProvider>();
cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
{
sendContext.CorrelationId =
sendContext.CorrelationId == Guid.Empty ?
correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
}));
});

return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}

最佳答案

首先,我假设您使用的是较旧版本的 MassTransit,因为从 v6 开始,切换是为了远离全局预取。

其次,高预取计数与 1 的并发限制相结合将导致 (prefetchcount - 1) 条消息位于接收端点上等待处理,而一次处理 1 条消息。因此,如果只有 50 条消息,第一个节点可能会全部获取它们,然后您的其他节点处于空闲状态,因为消息正在单个节点上等待通过瓶颈。

带有 channel 预取的 RabbitMQ 管理控制台的当前版本如下所示:

RabbitMQ Prefetch Count

由于 MassTransit 仅将单个消费者放在 channel 上,以前的方法本质上将消费者限制在全局 channel 预取中,但现在它更加明确。此外,新设置适用于不支持全局预取设置的仲裁队列。

如果您的数据库重载,并且已经优化了数据库查询以避免锁定/阻塞,并且需要减少流量,请将预取降低到接近并发限制的 140%。所以,说真的,如果你是 1,请将 prefetch 设置为 2。

关于c# - MassTransit - PrefetchCount 和单个消费者的多个 channel 的解释,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61492494/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com