gpt4 book ai didi

c# - 在队列的两次消息读取之间创建延迟?

转载 作者:太空宇宙 更新时间:2023-11-03 12:27:09 25 4
gpt4 key购买 nike

我正在使用 Azure 队列执行批量导入。我正在使用 WebJobs 在后台执行该过程。队列非常频繁地出队。如何在 2 条消息之间创建延迟读?

这就是我向队列中添加消息的方式

public async Task<bool> Handle(CreateFileUploadCommand message)
{
var queueClient = _queueService.GetQueueClient(Constants.Queues.ImportQueue);

var brokeredMessage = new BrokeredMessage(JsonConvert.SerializeObject(new ProcessFileUploadMessage
{
TenantId = message.TenantId,
FileExtension = message.FileExtension,
FileName = message.Name,
DeviceId = message.DeviceId,
SessionId = message.SessionId,
UserId = message.UserId,
OutletId = message.OutletId,
CorrelationId = message.CorrelationId,

}))
{
ContentType = "application/json",
};

await queueClient.SendAsync(brokeredMessage);

return true;
}

下面是 WebJobs 函数。

public class Functions
{
private readonly IValueProvider _valueProvider;
public Functions(IValueProvider valueProvider)
{
_valueProvider = valueProvider;
}

public async Task ProcessQueueMessage([ServiceBusTrigger(Constants.Constants.Queues.ImportQueue)] BrokeredMessage message,
TextWriter logger)
{

var queueMessage = message.GetBody<string>();

using (var client = new HttpClient())
{
client.BaseAddress = new Uri(_valueProvider.Get("ServiceBaseUri"));

var stringContent = new StringContent(queueMessage, Encoding.UTF8, "application/json");

var result = await client.PostAsync(RestfulUrls.ImportMenu.ProcessUrl, stringContent);

if (result.IsSuccessStatusCode)
{
await message.CompleteAsync();
}
else
{
await message.AbandonAsync();
}
}
}
}

最佳答案

据我所知,azure webjobs sdk 在单个实例上启用并发处理(默认为 16)。

如果您运行您的 webjobs,它将读取 16 个队列消息(peeklock 并在函数成功完成时对消息调用 Complete,或者调用 Abandon)并同时创建 16 个进程来执行触发函数。所以你感觉队列出队非常频繁。

如果您想在单个实例上禁用并发处理。

我建议您可以将 ServiceBusConfiguration 的 MessageOptions.MaxConcurrentCalls 设置为 1。

更多细节,你可以引用下面的代码:

在program.cs中:

JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);

JobHost host = new JobHost(config);
host.RunAndBlock();

如果您想在两次消息读取之间创建延迟,我建议您可以创建自定义 ServiceBusConfiguration.MessagingProvider。

它包含 CompleteProcessingMessageAsync 方法,该方法在调用作业函数后完成对指定消息的处理。

我建议你可以在CompleteProcessingMessageAsync中添加thread.sleep方法来实现延迟读取。

更多细节,您可以引用下面的代码示例:

CustomMessagingProvider.cs:

注意:我覆盖了 CompleteProcessingMessageAsync 方法代码。

 public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;

public CustomMessagingProvider(ServiceBusConfiguration config)
: base(config)
{
_config = config;
}

public override NamespaceManager CreateNamespaceManager(string connectionStringName = null)
{
// you could return your own NamespaceManager here, which would be used
// globally
return base.CreateNamespaceManager(connectionStringName);
}

public override MessagingFactory CreateMessagingFactory(string entityPath, string connectionStringName = null)
{
// you could return a customized (or new) MessagingFactory here per entity
return base.CreateMessagingFactory(entityPath, connectionStringName);
}

public override MessageProcessor CreateMessageProcessor(string entityPath)
{
// demonstrates how to plug in a custom MessageProcessor
// you could use the global MessageOptions, or use different
// options per entity
return new CustomMessageProcessor(_config.MessageOptions);
}

private class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions)
: base(messageOptions)
{
}

public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
{
// intercept messages before the job function is invoked
return base.BeginProcessingMessageAsync(message, cancellationToken);
}

public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();


await message.CompleteAsync();

Console.WriteLine("Begin sleep");
//Sleep 5 seconds
Thread.Sleep(5000);
Console.WriteLine("Sleep 5 seconds");

}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}

Program.cs主要方法:

 static void Main()
{
var config = new JobHostConfiguration();

if (config.IsDevelopment)
{
config.UseDevelopmentSettings();
}

var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false,
MaxConcurrentCalls = 1
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);

// The following code ensures that the WebJob will be running continuously
host.RunAndBlock();
}

结果:enter image description here

关于c# - 在队列的两次消息读取之间创建延迟?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44330925/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com