gpt4 book ai didi

c# - Aws Sqs Consumer - 仅在可以立即处理消息时进行轮询

转载 作者:行者123 更新时间:2023-12-04 08:13:06 26 4
gpt4 key购买 nike

我正在尝试创建一个 AWS SQS Windows 服务使用者,它将以 10 个为一组轮询消息。每条消息都将在其自己的任务中执行以进行并行执行。消息处理包括调用不同的 api 和发送电子邮件,因此可能需要一些时间。
我的问题是,首先,我只想在可以立即处理 10 条消息时轮询队列。这是由于 sqs 可见性超时,并且接收到的消息“等待”可能会超过可见性超时并“返回”队列。这将产生重复。我不认为调整可见性超时是好的,因为仍有可能重复消息,而这正是我试图避免的。其次,我想对并行性进行某种限制(例如,最大限制为 100 个并发任务),以便服务器资源可以保持在海湾,因为服务器中还有其他应用程序正在运行。
如何实现这一目标?或者有没有其他方法可以解决这些问题?

最佳答案

这个答案做出以下假设:

  • 从 AWS 获取消息应该被序列化。只有消息的处理应该被并行化。
  • 从 AWS 获取的每条消息都应该被处理。在所有获取的消息有机会被处理之前,整个执行不应终止。
  • 应该等待每个消息处理操作。整个执行不应在所有启动的任务完成之前终止。
  • 处理消息期间发生的任何错误都应被忽略。整个执行不应因为单个消息的处理失败而终止。
  • 从 AWS 获取消息期间发生的任何错误都应该是致命的。整个执行应该终止,但不是在所有当前运行的消息处理操作完成之前终止。
  • 执行机制应该能够处理从 AWS 中获取操作返回的消息数量与请求数量不同的批次的情况。

  • 以下是(希望)满足这些要求的实现:
    /// <summary>
    /// Starts an execution loop that fetches batches of messages sequentially,
    /// and process them one by one in parallel.
    /// </summary>
    public static async Task ExecutionLoopAsync<TMessage>(
    Func<int, Task<TMessage[]>> fetchMessagesAsync,
    Func<TMessage, Task> processMessageAsync,
    int fetchCount,
    int maxDegreeOfParallelism,
    CancellationToken cancellationToken = default)
    {
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism, maxDegreeOfParallelism);

    // Count how many times we have acquired the semaphore, so that we know
    // how many more times we have to acquire it before we exit from this method.
    int acquiredCount = 0;
    try
    {
    while (true)
    {
    Debug.Assert(acquiredCount == 0);
    for (int i = 0; i < fetchCount; i++)
    {
    await semaphore.WaitAsync(cancellationToken);
    acquiredCount++;
    }

    TMessage[] messages = await fetchMessagesAsync(fetchCount)
    ?? Array.Empty<TMessage>();

    for (int i = 0; i < messages.Length; i++)
    {
    if (i >= fetchCount) // We got more messages than we asked for
    {
    await semaphore.WaitAsync();
    acquiredCount++;
    }
    ProcessAndRelease(messages[i]);
    acquiredCount--;
    }

    if (messages.Length < fetchCount)
    {
    // We got less messages than we asked for
    semaphore.Release(fetchCount - messages.Length);
    acquiredCount -= fetchCount - messages.Length;
    }

    // This method is 'async void' because it is not expected to throw ever
    async void ProcessAndRelease(TMessage message)
    {
    try { await processMessageAsync(message); }
    catch { } // Swallow exceptions
    finally { semaphore.Release(); }
    }
    }
    }
    catch (SemaphoreFullException)
    {
    // Guard against the (unlikely) scenario that the counting logic is flawed.
    // The counter is no longer reliable, so skip the awaiting in finally.
    acquiredCount = maxDegreeOfParallelism;
    throw;
    }
    finally
    {
    // Wait for all pending operations to complete. This could cause a deadlock
    // in case the counter has become out of sync.
    for (int i = acquiredCount; i < maxDegreeOfParallelism; i++)
    await semaphore.WaitAsync();
    }
    }
    用法示例:
    var cts = new CancellationTokenSource();

    Task executionTask = ExecutionLoopAsync<Message>(async count =>
    {
    return await GetBatchFromAwsAsync(count);
    }, async message =>
    {
    await ProcessMessageAsync(message);
    }, fetchCount: 10, maxDegreeOfParallelism: 100, cts.Token);

    关于c# - Aws Sqs Consumer - 仅在可以立即处理消息时进行轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65850128/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com