gpt4 book ai didi

c# - 公共(public)交通 - 如何使用 Azure 服务总线安排消息

转载 作者:行者123 更新时间:2023-12-02 06:16:07 31 4
gpt4 key购买 nike

我已经检查了有关 Scheduling with Azure Service Bus 的文档,但我不清楚如何从“断开的”总线发送消息。

以下是我如何配置在服务器上处理消息的服务:

builder.AddMassTransit(mt =>
{
mt.AddConsumers(cqrsAssembly);

mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.UseServiceBusMessageScheduler();

var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "access-key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});

h.OperationTimeout = TimeSpan.FromMinutes(2);
});

x.ReceiveEndpoint(host, $"mt.myqueue", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;

ep.UseMessageRetry(r =>
{
r.Interval(4, TimeSpan.FromSeconds(30));
r.Handle<TransientCommandException>();
});

ep.ConfigureConsumers(context);
});
});
});

我已明确调用 UseServiceBusMessageScheduler()

在创建消息并将其发送到队列的项目中(在不同的上下文中运行,因此完成“仅发送”),我们有以下内容:

var bus = Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.Send<ICommand>(s => s.UseSessionIdFormatter(ctx => ctx.Message.SessionId ?? Guid.NewGuid().ToString()));


var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
h.OperationTimeout = TimeSpan.FromMinutes(2);
});

EndpointConvention.Map<ICommand>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
EndpointConvention.Map<Command>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
});

现在,要发送预定消息,我们这样做:

var dest = "what?";
await bus.ScheduleSend(dest, scheduledEnqueueTimeUtc.Value, message);

我不确定需要将什么传递到destinationAddress

我尝试过:- serviceUri- `{serviceUri}mt.myqueue"

但是检查队列时,我在基本队列、​​skipped 队列或error 队列中都没有看到我的消息。

我是否缺少一些其他配置,如果没有,如何确定目标队列?

我正在使用 Mass Transit 的 5.5.4 版本,ScheduleSend() 的每次重载都需要它。

最佳答案

首先,您的 Uri 格式是正确的。最后格式化后你需要这样的东西:

new Uri(@"sb://yourdomain.servicebus.windows.net/yourapp/your_message_queue")

另请确保您在配置端点时添加了。 (见下面的链接)

configurator.UseServiceBusMessageScheduler();

如果您遵循公共(public)交通文档,调度是通过 ConsumeContext 完成的。请参阅Mass-Transit Azure Scheduling

public class ScheduleNotificationConsumer :
IConsumer<AssignSeat>
{
Uri _schedulerAddress;
Uri _notificationService;

public async Task Consume(ConsumeContext<AssignSeat> context)
{
if(context.Message.ReservationTime - DateTime.Now < TimeSpan.FromHours(8))
{
// assign the seat for the reservation
}
else
{
// seats can only be assigned eight hours before the reservation
context.ScheduleMessage(context.Message.ReservationTime - TimeSpan.FromHours(8), context.Message);
}
}
}

但是,在本周我们面临的一个用例中,我们需要从 ConsumerContext 外部进行调度,或者只是不想将上下文转发到我们调度的位置。当使用 IBusControl.ScheduleSend 时,我们没有得到任何错误反馈,但我们也没有真正完成任何调度。 在查看了 Mass-Transit 的作用后,发现它从 IBusControl 创建了一个新的调度提供程序。而在 Context 中,它使用 ServiceBusScheduleMessageProvider。

因此,在清理这一点之前,我们现在要做的就是直接调用 ServiceBusScheduleMessageProvider。

        await new ServiceBusScheduleMessageProvider(_busControl).ScheduleSend(destinationUri
, scheduleDateTime.UtcDateTime
, Task.FromResult<T>(message)
, Pipe.Empty<SendContext>()
, default);

希望它有意义并且有所帮助。

关于c# - 公共(public)交通 - 如何使用 Azure 服务总线安排消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57387193/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com