gpt4 book ai didi

c# - 如何拆分和合并此数据流管道?

转载 作者:行者123 更新时间:2023-12-04 04:22:40 25 4
gpt4 key购买 nike

我正在尝试使用具有以下形式的 tpl 创建数据流:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
-> LoadDataBlock3 -> ProcessDataBlock3 ->
...
-> LoadDataBlockN -> ProcessDataBlockN ->

这个想法是, GetInputPathsBlock是一个块,它找到要加载的输入数据的路径,然后将路径发送到每个 LoadDataBlock . LoadDataBlocks 都是相同的(除了它们每个都从 GetInputPaths 接收到一个唯一的 inputPath 字符串)。然后将加载的数据发送到 ProcessDataBlock ,它做了一些简单的处理。然后来自每个 ProcessDataBlock的数据发送至 MergeDataBlock ,合并它并将其发送到 SaveDataBlock ,然后将其保存到文件中。

将其视为需要每个月运行的数据流。首先找到每天数据的路径。每天的数据都会被加载和处理,然后将整个月的数据合并在一起并保存。每个月可以并行运行,可以并行加载和并行处理一个月中的每一天的数据(在加载了个别日期的数据之后),并且一旦加载并处理了该月的所有内容,就可以合并并保存.

我试过的

据我所知 TransformManyBlock<TInput,string>可以用来做 split ( GetInputPathsBlock ),并且可以链接到一个普通的 TransformBlock<string,InputData> ( LoadDataBlock ),然后从那里到另一个 TransformBlock<InputData,ProcessedData> ( ProcessDataBlock ),但我不知道如何将其合并回单个块。

我看到了什么

我找到了 this answer , 使用 TransformManyBlockIEnumerable<item> 开始至 item ,但我不完全理解它,我无法链接 TransformBlock<InputData,ProcessedData> ( ProcessDataBlock ) 到 TransformBlock<IEnumerable<ProcessedData>>,ProcessedData> ,所以我不知道如何使用它。

我也看到了答案 like this ,建议使用 JoinBlock ,但输入文件的数量 N 各不相同,无论如何,文件都以相同的方式加载。

还有 this answer ,这似乎是我想要的,但我不完全理解它,我不知道如何将字典的设置转移到我的案例中。

如何拆分和合并我的数据流?
  • 是否有我缺少的块类型
  • 我可以以某种方式使用 TransformManyBlock两次?
  • tpl 对拆分/合并是否有意义还是有更简单的异步/等待方式?
  • 最佳答案

    我会使用嵌套块来避免拆分我的每月数据,然后不得不再次合并它们。这是两个嵌套 TransformBlock 的示例s 处理 2020 年的所有日子:

    var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
    {
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
    await Task.Delay(100); // Simulate async work
    return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
    await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

    foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
    monthlyBlock.Complete();

    为了收集内部区块的每日结果,我使用了扩展方法 ToListAsync如下所示:
    public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
    {
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
    while (block.TryReceive(out var item))
    {
    list.Add(item);
    }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
    }

    关于c# - 如何拆分和合并此数据流管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58714155/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com