gpt4 book ai didi

c# - Blocked on 数据流的 block 设计

转载 作者:行者123 更新时间:2023-11-30 23:14:12 25 4
gpt4 key购买 nike

我们有一个数据处理管道,我们正在尝试使用 TPL Dataflow框架。

管道的基本要点:

  1. 遍历文件系统上的 CSV 文件 (10,000)
  2. 验证我们没有导入内容,如果我们忽略
  3. 遍历单个 CSV 文件(20,000-120,000 行)的内容并创建适合我们需求的数据结构。
  4. 将这些新的 dataStructured 项中的 100 个批量化,并将它们推送到数据库中
  5. 将 CSV 文件标记为正在导入。

现在我们有一个现有的 Python 文件,它以非常缓慢和痛苦的方式执行上述所有操作 - 代码一团糟。

我的想法是以下查看 TPL Dataflow .

  1. BufferBlock<string>将所有文件发布到
  2. TransformBlock<string, SensorDataDto>检测是否导入该文件的谓词
  3. TransformBlock<string, SensorDataDto>读取 CSV 文件并创建 SensorDataDto结构
  4. BatchBlock<SensorDataDto> TransformBlock 中使用委托(delegate)批处理 100 个请求。

    4.5。 ActionBlock<SensorDataDto>将 100 条记录推送到数据库中。

  5. ActionBlock将 CSV 标记为已导入。

我已经创建了前几个操作并且它们正在工作(BufferBlock -> TransformBlock + Predicate && Process if hasn't)但我不确定如何继续流程以便我可以将 100 张贴到 BatchBlockTransformBlock 内并连接以下操作。

这看起来正确吗 - 基本要点,我该如何处理 BufferBlock TPL 数据流动方式中的位?

bufferBlock.LinkTo(readCsvFile, ShouldImportFile)
bufferBlock.LinkTo(DataflowBlock.NullTarget<string>())
readCsvFile.LinkTo(normaliseData)
normaliseData.LinkTo(updateCsvImport)
updateCsvImport.LinkTo(completionBlock)

batchBlock.LinkTo(insertSensorDataBlock)

bufferBlock.Completion.ContinueWith(t => readCsvFile.Complete());
readCsvFile.Completion.ContinueWith(t => normaliseData.Complete());
normaliseData.Completion.ContinueWith(t => updateCsvImport.Complete());
updateCsvImport.Completion.ContinueWith(t => completionBlock.Complete());

batchBlock.Completion.ContinueWith(t => insertSensorDataBlock.Complete());

normaliseData里面我调用的方法 BatchBlock.Post<..>(...) , 这是一个好的模式还是应该以不同的方式构建?我的问题是我只能在所有记录都被推送后将文件标记为正在导入。

Task.WhenAll(bufferBlock.Completion, batchBlock.Completion).Wait();

如果我们有一批 100 ,如果80怎么办?被插入,有没有办法排干最后一个80

我不确定是否应该链接 BatchBlock在主管道中,我会等到两者都完成。

最佳答案

首先,您不需要使用 Completion在这方面,您可以使用 PropagateCompletion 链接期间的属性:

// with predicate
bufferBlock.LinkTo(readCsvFile, new DataflowLinkOptions { PropagateCompletion = true }, ShouldImportFile);
// without predicate
readCsvFile.LinkTo(normaliseData, new DataflowLinkOptions { PropagateCompletion = true });

现在,回到您的批处理问题。也许,你可以使用 JoinBlock<T1, T2> BatchedJoinBlock<T1, T2> 在这里,通过将它们附加到您的管道并收集连接的结果,您可以全面了解正在完成的工作。也许你可以实现自己的 ITargetBlock<TInput>这样您就可以按照自己的方式使用消息。

根据official docs ,这些 block 是贪婪的,并且一旦可用就从链接的 block 收集数据,所以如果一个目标准备好而另一个没有准备好,或者批处理 block 有 80%,连接 block 可能会卡住。批量大小,所以你需要把它记在心里。在您自己实现的情况下,您可以使用 ITargetBlock<TInput>.OfferMessage 从您的来源获取信息的方法。

BatchBlock<T> is capable of executing in both greedy and non-greedy modes. In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.

In non-greedy mode, all messages are postponed from sources until enough sources have offered messages to the block to create a batch. Thus, a BatchBlock<T> can be used to receive 1 element from each of N sources, N elements from 1 source, and a myriad of options in between.

关于c# - Blocked on 数据流的 block 设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43255432/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com