gpt4 book ai didi

c# - 接收并发异步请求并一次处理一个

转载 作者:行者123 更新时间:2023-12-03 12:54:13 25 4
gpt4 key购买 nike

背景

我们有一个服务操作,可以接收并发的异步请求,并且必须一次处理一个请求。

在下面的示例中,UploadAndImport(...)方法在多个线程上接收并发请求,但是它对ImportFile(...)方法的调用必须一次发生一次。

裁员说明

想象一下一个有很多 worker (多线程)的仓库。人员(客户)可以同时(同时)向仓库发送许多包裹(请求)。当包裹进来时, worker 要自始至终负责,丢下包裹的人可以离开(开火忘了)。 worker 的工作是将每个包裹放进一个小斜槽,一次只能有一个 worker 将一个包裹放进一个斜槽,否则会造成困惑。如果放下包裹的人员稍后再进站(轮询端点),则仓库应能够报告包裹是否从斜槽滑落。

问题

然后的问题是如何编写一个服务操作...

  • 可以接收并发客户端请求
  • 在多个线程
  • 上接收并处理这些请求
  • 在接收到请求的同一线程
  • 上处理请求
  • 一次处理一个请求,
  • 是一种一劳永逸的操作,而
  • 有一个单独的轮询端点,该端点报告请求完成情况。

  • 我们已经尝试了以下方法,并且想知道两件事:
  • 是否有我们没有考虑过的比赛条件?
  • 在面向服务的体系结构(我们碰巧正在使用WCF)中,是否有更规范的方法在C#.NET中编码此方案?

  • 示例:我们尝试过什么?

    这是我们尝试过的服务代码。它的工作原理虽然听起来有些像是乱砍或l头。
    static ImportFileInfo _inProgressRequest = null;

    static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests =
    new ConcurrentDictionary<Guid, ImportFileInfo>();

    public void UploadAndImport(ImportFileInfo request)
    {
    // Receive the incoming request
    WaitingRequests.TryAdd(request.OperationId, request);

    while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
    {
    // Wait for any previous processing to complete
    Thread.Sleep(500);
    }

    // Process the incoming request
    ImportFile(request);

    Interlocked.Exchange(ref _inProgressRequest, null);
    WaitingRequests.TryRemove(request.OperationId, out _);
    }

    public bool UploadAndImportIsComplete(Guid operationId) =>
    !WaitingRequests.ContainsKey(operationId);

    这是示例客户端代码。
    private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
    {
    using (var proxy = new Proxy())
    using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
    {
    importFileInfo.FileByteStream = stream;
    proxy.UploadAndImport(importFileInfo);
    }

    await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
    {
    using (var proxy = new Proxy())
    {
    return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
    }
    }));
    }

    很难在Fiddle中编写一个最小的可行示例,但是 here is a start可以给人一种感觉并且可以编译。

    和以前一样,以上内容似乎很容易破解,我们既在询问其方法可能存在的陷阱,又在询问更合适/规范的替代模式。

    最佳答案

    在出现线程数限制的情况下,使用Producer-Consumer模式来管道请求的简单解决方案。

    您仍然必须实现一个简单的进度报告程序或事件。我建议用Microsoft的SignalR库提供的异步通信代替昂贵的轮询方法。它使用WebSocket启用异步行为。客户端和服务器可以在集线器上注册其回调。客户端现在可以使用RPC调用服务器端方法,反之亦然。您将使用中心(客户端)将进度发布到客户端。以我的经验,SignalR非常简单易用,并且有很好的文档记录。它具有用于所有著名的服务器端语言(例如Java)的库。

    在我的理解中,轮询与“一劳永逸”完全相反。您不能忘记,因为您必须根据时间间隔检查某些内容。自触发后,基于事件的通信(如SignalR)就一劳永逸,并且会得到提醒(因为您忘记了)。 “事件侧”将调用您的回调,而不是您自己等待做!

    需求5被忽略了,因为我没有任何理由。等待线程完成将消除火灾并忘记字符。

    private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
    private bool isServiceEnabled;
    private readonly int maxNumberOfThreads = 8;
    private Semaphore semaphore = new Semaphore(numberOfThreads);
    private readonly object syncLock = new object();

    public void UploadAndImport(ImportFileInfo request)
    {
    // Start the request handler background loop
    if (!this.isServiceEnabled)
    {
    this.requestQueue?.Dispose();
    this.requestQueue = new BlockingCollection<ImportFileInfo>();

    // Fire and forget (requirement 4)
    Task.Run(() => HandleRequests());
    this.isServiceEnabled = true;
    }

    // Cache multiple incoming client requests (requirement 1) (and enable throttling)
    this.requestQueue.Add(request);
    }

    private void HandleRequests()
    {
    while (!this.requestQueue.IsCompleted)
    {
    // Wait while thread limit is exceeded (some throttling)
    this.semaphore.WaitOne();

    // Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
    Task.Run(() => ProcessRequest());
    }

    // Reset the request handler after BlockingCollection was marked completed
    this.isServiceEnabled = false;
    this.requestQueue.Dispose();
    }

    private void ProcessRequest()
    {
    ImportFileInfo request = this.requestQueue.Take();
    UploadFile(request);

    // You updated your question saying the method "ImportFile()" requires synchronization.
    // This a bottleneck and will significantly drop performance, when this method is long running.
    lock (this.syncLock)
    {
    ImportFile(request);
    }

    this.semaphore.Release();
    }

    评论:
  • BlockingCollection是一个IDisposable
  • TODO:您必须通过将BlockingCollection标记为完成来“关闭”它:
    “BlockingCollection.CompleteAdding()”,否则它将无限期地循环等待更多请求。也许您为客户端引入了其他请求方法,以取消和/或更新过程并将添加到BlockingCollection的标记为已完成。或者是一个计时器,它在将其标记为完成之前等待空闲时间。或者使您的请求处理程序线程阻塞或旋转。
  • 如果要取消支持,请用TryTake(...)和TryAdd(...)替换Take()和Add(...)
  • 代码未经测试
  • 您的“ImportFile()”方法是多线程环境中的瓶颈。我建议使其线程安全。在需要同步的I/O的情况下,我会将数据缓存在BlockingCollection中,然后将它们一个接一个地写入I/O。
  • 关于c# - 接收并发异步请求并一次处理一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50184497/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com