gpt4 book ai didi

c# - 使用 SemaphoreSlim 限制 TPL 数据流的问题

转载 作者:行者123 更新时间:2023-12-04 12:55:42 35 4
gpt4 key购买 nike

范围:

  • 我想通过将一个大文件(1 GB+)拆分成更小的(可管理的)块(分区)来处理它,将它们保存在一些存储基础设施(本地磁盘、blob、网络等)上,并在内存中一一处理它们.
  • 我想通过利用 TPL Dataflow 来实现这一点。库,我在内存文件分区上创建了几个处理块,每个块都执行特定的操作。
  • 进一步,我正在使用 SemaphoreSlim对象限制在给定时间处理的最大内存分区数,直到它被加载并完全处理。
  • 我还在块级别使用 MaxDegreeOfParallelism 配置属性来限制每个块的并行度。

  • 从技术角度来看,范围是通过使用信号量来限制多个分区的并行处理,跨几个连续的流水线步骤,从而避免内存过载。
    问题描述:当 MaxDegreeOfParallelism 为除第一个之外的所有 Dataflow 块设置为大于 1 的值时,进程挂起并且似乎达到了死锁。当 MaxDegreeOfParallelism 设置为 1 时,一切都按预期进行。下面的代码示例...
    你有什么想法/提示/提示为什么会发生这种情况吗?
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    namespace DemoConsole
    {
    class Program
    {
    private static readonly SemaphoreSlim _localSemaphore = new(1);

    static async Task Main(string[] args)
    {
    Console.WriteLine("Configuring pipeline...");

    var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

    var filter1 = new TransformManyBlock<string, PartitionInfo>(CreatePartitionsAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    // when MaxDegreeOfParallelism on the below line is set to 1, everything works as expected; any value greater than 1 causes issues
    var blockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 };

    var filter2 = new TransformBlock<PartitionInfo, PartitionInfo>(ReadPartitionAsync, blockOptions);
    var filter3 = new TransformBlock<PartitionInfo, PartitionInfo>(MapPartitionAsync, blockOptions);
    var filter4 = new TransformBlock<PartitionInfo, PartitionInfo>(ValidatePartitionAsync, blockOptions);

    var actionBlock = new ActionBlock<PartitionInfo>(async (x) => { await Task.CompletedTask; });

    filter1.LinkTo(filter2, dataflowLinkOptions);
    filter2.LinkTo(filter3, dataflowLinkOptions);
    filter3.LinkTo(filter4, dataflowLinkOptions);
    filter4.LinkTo(actionBlock, dataflowLinkOptions);

    await filter1.SendAsync("my-file.csv");

    filter1.Complete();

    await actionBlock.Completion;

    Console.WriteLine("Pipeline completed.");
    Console.ReadKey();
    Console.WriteLine("Done");
    }

    private static async Task<IEnumerable<PartitionInfo>> CreatePartitionsAsync(string input)
    {
    var partitions = new List<PartitionInfo>();
    const int noOfPartitions = 10;

    Log($"Creating {noOfPartitions} partitions from raw file on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");

    for (short i = 1; i <= noOfPartitions; i++)
    {
    partitions.Add(new PartitionInfo { FileName = $"{Path.GetFileNameWithoutExtension(input)}-p{i}-raw.json", Current = i });
    }

    await Task.CompletedTask;

    Log($"Creating {noOfPartitions} partitions from raw file completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

    return partitions;
    }

    private static async Task<PartitionInfo> ReadPartitionAsync(PartitionInfo input)
    {
    Log($"Sempahore - trying to enter for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
    await _localSemaphore.WaitAsync();
    Log($"Sempahore - entered for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

    Log($"Reading partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
    await Task.Delay(1000);
    Log($"Reading partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

    return input;
    }

    private static async Task<PartitionInfo> MapPartitionAsync(PartitionInfo input)
    {
    Log($"Mapping partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
    await Task.Delay(1000);
    Log($"Mapping partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

    return input;
    }

    private static async Task<PartitionInfo> ValidatePartitionAsync(PartitionInfo input)
    {
    Log($"Validating partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
    await Task.Delay(1000);
    Log($"Validating partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

    Log($"Sempahore - releasing - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
    _localSemaphore.Release();
    Log($"Sempahore - released - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

    return input;
    }

    private static void Log(string message) => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} : {message}");
    }

    class PartitionInfo
    {
    public string FileName { get; set; }
    public short Current { get; set; }
    }
    }

    最佳答案

    在实现此解决方案之前,请查看注释,因为您的代码中存在一个基本的架构问题。
    但是,您发布的问题是可重现的,可以通过以下 ExecutionDataflowBlockOption 更改来解决:

    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, EnsureOrdered = false });
    EnsureOrdered 属性默认为 .当并行度 > 1 时,无法保证首先处理哪个消息。如果首先处理的消息不是块接收的第一个消息,它将在重新排序缓冲区中等待,直到它接收到的第一条消息完成。因为 filter1 是一个 TransformManyBlock,所以我不确定是否有可能知道每条消息发送到 filter2 的顺序。
    如果您运行代码的次数足够多,您最终会很幸运,并且发送到 filter2 的第一条消息也首先得到处理,在这种情况下,它将释放信号量和进度。但是在处理下一条消息时,您将遇到同样的问题;如果它不是收到的第二条消息,它将在重新排序缓冲区中等待。

    关于c# - 使用 SemaphoreSlim 限制 TPL 数据流的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67973108/

    35 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com