gpt4 book ai didi

c# - Azure Service Fabric 消息队列

转载 作者:行者123 更新时间:2023-11-30 23:23:40 25 4
gpt4 key购买 nike

我正在尝试对一系列任务进行排队并使用 Azure Service Fabric 异步运行它们。我目前正在将 CloudMessageQueue 与辅助角色一起使用。我正在尝试迁移到 Service Fabric。从 worker 角色来看,这是我的代码:

    private void ExecuteTask()
{
CloudQueueMessage message = null;

if (queue == null)
{
jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting.")));
return;
}

try
{
message = queue.GetMessage();
if (message != null)
{
JMATask task = GetTask(message.AsString);
string msg = (message == null) ? string.Empty : message.AsString;
//jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg)));
queue.DeleteMessage(message);
PerformTask(task);
}
}
catch (Exception ex)
{
string msg = (message == null) ? string.Empty : message.AsString;
jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString())));
}
}

我有一些问题:

  1. 如何异步运行执行任务方法?我想同时运行大约 30 - 40 个任务。
  2. 我有一个 JMATask 列表。如何将列表添加到队列中?
  3. 列表是否需要添加到队列中?

    namespace Stateful1
    {
    public class JMATask
    {
    public string Name { get; set; }
    }

    /// <summary>
    /// An instance of this class is created for each service replica by the Service Fabric runtime.
    /// </summary>
    internal sealed class Stateful1 : StatefulService
    {
    public Stateful1(StatefulServiceContext context)
    : base(context)
    { }

    /// <summary>
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests.
    /// </summary>
    /// <remarks>
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication
    /// </remarks>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
    return new ServiceReplicaListener[0];
    }

    /// <summary>
    /// This is the main entry point for your service replica.
    /// This method executes when this replica of your service becomes primary and has write status.
    /// </summary>
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param>
    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
    // TODO: Replace the following sample code with your own logic
    // or remove this RunAsync override if it's not needed in your service.

    IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
    //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");

    while (true)
    {
    cancellationToken.ThrowIfCancellationRequested();

    using (var tx = this.StateManager.CreateTransaction())
    {
    var result = await tasks.TryDequeueAsync(tx);

    //how do I execute this method async?
    PerformTask(result.Value);

    //Create list of JMA Tasks to queue up
    await tasks.EnqueueAsync(tx, new JMATask());

    //var result = await myDictionary.TryGetValueAsync(tx, "Counter");

    //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}",
    // result.HasValue ? result.Value.ToString() : "Value does not exist.");

    //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value);

    // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are
    // discarded, and nothing is saved to the secondary replicas.
    await tx.CommitAsync();
    }

    await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    }
    }

    private async void PerformTask(JMATask task)
    {
    //execute task
    }

    }

最佳答案

RunAsync 方法不应包含以下行代码:awaittasks.EnqueueAsync(tx, new JMATask());

创建要排队的 JMA 任务列表应该是有状态服务中的另一种方法,如下所示:

    public async Task AddJMATaskAsync(JMATask task)
{
var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
using (var tx = StateManager.CreateTransaction())
{
try
{
await tasksQueue.EnqueueAsync(tx, request);
await tx.CommitAsync();
}
catch (Exception ex)
{
tx.Abort();
}
}
}

然后您的 PerformTask 方法可以包含对无状态微服务的调用:

    public async Task PerformTask (JMATask task)
{
//1. resolve stateless microservice URI
// statelessSvc

//2. call method of the stateless microservice
// statelessSvc.PerformTask(task);
}

基本上,有状态服务只会对任务进行排队和出队。执行实际任务可以通过微服务来完成,该微服务可供集群中的所有节点使用。

关于c# - Azure Service Fabric 消息队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38246022/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com