gpt4 book ai didi

azure - 如何并行化 azure worker 角色?

转载 作者:行者123 更新时间:2023-12-03 01:10:15 25 4
gpt4 key购买 nike

我有一个在 azure 中运行的辅助角色。

该工作进程处理一个包含大量整数的队列。对于每个整数,我必须进行相当长的处理(根据整数从 1 秒到 10 分钟)。

由于这非常耗时,我想并行进行这些处理。不幸的是,当我使用 400 个整数的队列进行测试时,我的并行化似乎效率不高。

这是我的实现:

  public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private readonly Manager _manager = Manager.Instance;
private static readonly LogManager logger = LogManager.Instance;

public override void Run() {
logger.Info("Worker is running");

try {
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
catch (Exception e) {
logger.Error(e, 0, "Error Run Worker: " + e);
}
finally {
this.runCompleteEvent.Set();
}
}

public override bool OnStart() {
bool result = base.OnStart();

logger.Info("Worker has been started");

return result;
}

public override void OnStop() {
logger.Info("Worker is stopping");

this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();

base.OnStop();

logger.Info("Worker has stopped");
}

private async Task RunAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
_manager.ProcessQueue();
}
catch (Exception e) {
logger.Error(e, 0, "Error RunAsync Worker: " + e);
}
}
await Task.Delay(1000, cancellationToken);

}
}
}

以及ProcessQueue的实现:

  public void ProcessQueue() {
try {

_queue.FetchAttributes();

int? cachedMessageCount = _queue.ApproximateMessageCount;

if (cachedMessageCount != null && cachedMessageCount > 0) {

var listEntries = new List<CloudQueueMessage>();

listEntries.AddRange(_queue.GetMessages(MAX_ENTRIES));

Parallel.ForEach(listEntries, ProcessEntry);
}
}
catch (Exception e) {
logger.Error(e, 0, "Error ProcessQueue: " + e);
}
}

和ProcessEntry

    private void ProcessEntry(CloudQueueMessage entry) {
try {
int id = Convert.ToInt32(entry.AsString);

Service.GetData(id);

_queue.DeleteMessage(entry);

}
catch (Exception e) {
_queueError.AddMessage(entry);
_queue.DeleteMessage(entry);
logger.Error(e, 0, "Error ProcessEntry: " + e);
}
}

在 ProcessQueue 函数中,我尝试使用不同的 MAX_ENTRIES 值:首先 =20,然后 =2。当 MAX_ENTRIES=20 时,它似乎更慢,但无论 MAX_ENTRIES 的值是多少,它似乎都很慢。

我的虚拟机是 A2 介质。

我真的不知道我是否正确地进行了并行化;也许问题来自于工作人员本身(这可能很难并行)。

最佳答案

您没有提到您正在使用哪种 Azure 消息队列技术,但是对于我想要并行处理多个消息的任务,我倾向于在服务总线队列和订阅上使用消息泵模式 ,利用服务总线队列和订阅客户端上可用的 OnMessage() 方法:

来自 MSDN:

When calling OnMessage(), the client starts an internal message pump that constantly polls the queue or subscription. This message pump consists of an infinite loop that issues a Receive() call. If the call times out, it issues the next Receive() call.

此模式允许您使用委托(delegate)(在我的首选情况下是匿名函数)来处理 WaWorkerHost 进程上的单独线程上的 Brokered Message 实例的接收。事实上,为了提高吞吐量水平,您可以指定消息泵应提供的线程数量,从而允许您并行接收和处理队列中的 2、4、8 条消息。您还可以告诉消息泵在委托(delegate)成功处理消息后自动将消息标记为完成。线程计数和自动完成指令都在重载方法的 OnMessageOptions 参数中传递。

public override void Run()
{
var onMessageOptions = new OnMessageOptions()
{
AutoComplete = true, // Message-Pump will call Complete on messages after the callback has completed processing.
MaxConcurrentCalls = 2 // Max number of threads the Message-Pump can spawn to process messages.
};

sbQueueClient.OnMessage((brokeredMessage) =>
{

// Process the Brokered Message Instance here

}, onMessageOptions);

RunAsync(_cancellationTokenSource.Token).Wait();
}

如果需要,您仍然可以利用 RunAsync() 方法在主辅助角色线程上执行其他任务。

最后,我还建议您考虑将辅助角色实例扩展到至少 2 个(用于容错和冗余),以提高总体吞吐量。根据我在该模式的多个生产部署中所看到的情况,当多个辅助角色实例运行时,OnMessage() 执行得非常完美。

关于azure - 如何并行化 azure worker 角色?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31589239/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com