- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
上一篇博客 分享了使用 Channel 来实现针对大量数据的多线程异步处理,感谢大哥们在评论中提出的宝贵的问题和建议!本篇将分享使用 ActionBlock 如何实现,欢迎在评论区留言讨论.
ActionBlock是 .NET 中 TPL Dataflow 库的一部分,用于处理数据流和并行任务。它提供了一种简单而强大的方式来处理并行任务,并且可以轻松地实现生产者-消费者模式.
ActionBlock
可以配置为并行处理多个任务,从而提高处理效率ActionBlock
,消费者从ActionBlock
中读取数据并进行处理使用ActionBlock非常简单,主要步骤如下:
SendAsync
方法将数据发送到 ActionBlockComplete
方法通知 ActionBlock 不再接收新的数据Completion
属性等待所有数据处理完成以下是一个简单的示例代码,展示了如何使用 ActionBlock:
using System.Threading.Tasks.Dataflow;
var actionBlock = new ActionBlock<int>(async item =>
{
// 模拟异步处理
await Task.Delay(100);
Console.WriteLine($"Processed item: {item}");
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4 // 设置最大并行度
});
// 发送数据到 ActionBlock
for (int i = 0; i < 10; i++)
{
await actionBlock.SendAsync(i);
}
// 完成 ActionBlock
actionBlock.Complete();
// 等待处理完成
await actionBlock.Completion;
Console.WriteLine("All items processed.");
假设我们有一组数据需要经过两个步骤的处理。每个数据项都需要进行初步处理,然后进行进一步处理。希望步骤2可以在步骤1产生结果数据后立即开始处理,而不是等待步骤1完全处理完毕.
使用TransformBlock和ActionBlock来实现生产者-消费者模式。生产者负责读取数据并将其发送到TransformBlock中,消费者从TransformBlock中读取数据并进行处理。 以下是一个简单的示例代码,演示如何使用TransformBlock和ActionBlock实现生产者-消费者模式来处理数据:
using System.Threading.Tasks.Dataflow;
var cts = new CancellationTokenSource();
// 假设有一组数据
var dataItems = Enumerable.Range(0, 1000).Select(x => $"data_{x}").ToList();
var processor = new DataProcessor(10, cts.Token);
await processor.ProcessAsync(dataItems);
Console.ReadKey();
/// <summary>
/// 数据处理器
/// </summary>
public class DataProcessor(int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
public async Task ProcessAsync(List<string> dataItems)
{
// 创建一个 TransformBlock 用于步骤1的处理,并将结果发送到步骤2的 ActionBlock
var step1Block = new TransformBlock<string, string>(async dataItem => await Step1(dataItem), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
CancellationToken = cancellationToken
});
// 创建一个 ActionBlock 用于步骤2的处理
var step2Block = new ActionBlock<string>(async dataItem =>
{
await Step2(dataItem);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
CancellationToken = cancellationToken
});
// 将 TransformBlock 链接到 ActionBlock
step1Block.LinkTo(step2Block, new DataflowLinkOptions { PropagateCompletion = true });
// 启动多个步骤1的任务(生产者)
foreach (var dataItem in dataItems)
{
await step1Block.SendAsync(dataItem, cancellationToken);
}
// 完成步骤1的 TransformBlock 的写入
step1Block.Complete();
// 等待步骤1的 TransformBlock 处理完成
await step1Block.Completion;
// 完成步骤2的 ActionBlock 的写入
step2Block.Complete();
// 等待步骤2的 ActionBlock 处理完成
await step2Block.Completion;
}
private async Task<string> Step1(string dataItem)
{
// 模拟步骤1的处理(如初步处理数据)
await Task.Delay(10, cancellationToken);
Console.WriteLine($"Step1 processed data item: {dataItem}");
return dataItem;
}
private async Task Step2(string dataItem)
{
// 模拟步骤2的处理(如进一步处理数据)
await Task.Delay(10, cancellationToken);
Console.WriteLine($"Step2 processed data item: {dataItem}");
}
}
代码解释
ProcessAsync
方法中,我们首先创建了一个 TransformBlock,用于Step1的处理,TransformBlock 接受一个输入数据项,进行处理后返回一个输出数据项,TransformBlock<string, string>
表示输入和输出都是string
类型ActionBlock<string>
表示输入是string
类型LinkTo
方法将两个块连接起来,并设置PropagateCompletion
为 true,表示当 TransformBlock 完成时,ActionBlock 也会完成SendAsync
方法将数据项发送到 TransformBlockComplete
方法通知 TransformBlock 不再接收新的数据,并使用Completion
属性等待所有数据处理完成。然后完成Step2的 ActionBlock 的写入,并等待Step2的 ActionBlock 处理完成最后此篇关于在.NETCore中使用ActionBlock实现高效率的多步骤数据处理的文章就讲到这里了,如果你想了解更多关于在.NETCore中使用ActionBlock实现高效率的多步骤数据处理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
解释 我的应用程序基本上使用带有代表建筑物的多边形叠加层的 map View ,以及注释。为此,它导入了一个名为 Annotation 的自定义类,该类在点击注释时处理弹出详细信息,这意味着它存储建筑
我有一个数据处理问题,我想计算两支球队在不同比赛中的进球数差异。数据如下所示: matchId teamId eventSec 1 2799331 6718 443.55984
如下所示: ? 1
我想知道 cocoa 是否有默认的数据存储方式。如果是,那是什么?我的意思是 Rails 默认使用 sqlite... 此外,我正在寻找有关如何使用它的教程...例如获取数据并将其显示到 ListVi
我正在使用 HTML5 创建在线游戏。我将使用 JSON 字符串通过 Websockets 通信数据,因此典型的字符串将包含被调用的操作以及随之而来的数据: {action: "chat", user
我需要按特定列对一组 csv 行进行分组,并对每个组进行一些处理。 JavaRDD lines = sc.textFile ("somefile
我有一个情况: 基本上有 3 个模块,分别命名为“A”、“B”、“C”。每个模块都涉及多线程。 模块“A”获取高速数据(20ms)并发送。模块“B”的一个线程启动。 模块“B”提取相关数据并执行一些位
我正在处理有关城镇和城镇内区域的 MySQL 数据库中的一些数据。 数据库看起来像这样 ID | NAME 1 | Manchester 2 | Manchester/North 3 | Man
当我注册用户时,我得到一个状态代码 200 和一个 token :“”返回 JSON。如果用户已经存在,那么我会得到状态代码 200 和 html 响应而不是 JSON。我应该如何处理这个问题。提前致
我有一个应用程序,我从网络上下载大量资源,并对每个资源进行一些处理。我不希望这项工作发生在主线程上,但它非常轻量级且优先级较低,因此所有这些工作都可以真正发生在同一个共享工作线程上。这似乎是一件好事,
我目前正在与一家小公司合作,该公司将其所有应用程序数据存储在 AWS Redshift 集群中。我的任务是对该 Redshift 集群中的数据进行一些数据处理和机器学习。 我需要做的第一个任务是根据一
简介 有些 post 的请求参数是 json 格式的,这个前面发送post 请求里面提到过,需要导入 json模块处理。现在企业公司一般常见的接口因为json数据容易处理,所以绝大多数返回数据也是
1.数组的处理: 1.1 数组的创建和初始化: 1.arrary()函数创建数组,默认情况下0元素是数组的第一个元素, count()和sizeof()函数获得数
我正在尝试将 CKEditor 与 AngularJS 结合使用,用于具有数据绑定(bind)的 WYSIWYG 编辑器,一切似乎都运行良好。极端的可配置性对我们的需求匹配有很大帮助。 我们现在面临表
我正在对负样本和正样本进行文本二元分类任务,我想包括以下所有内容: 处理数据..(例如标记化) 特征选择,例如 Chi2 应用随机投影,因为我有一个大型稀疏矩阵(n_samples:974,n_fea
数据与我在 Pandas 系列: data = ["1. stock1 (1991)", "3. stock13 (1993)", "5. stock19 (1999)", "89. stock
1.字符串的定义与显示 定义:通过””,''来标志 显示:echo()和print(),但print()具有返回值值,1,而echo()没有,但echo比print()要快,
1.正则表达式基础知识 含义:由普通字符和(a-z)和一些特殊字符组成的字符串模式 功能:有效性验证。 替换文本。 从一个字符串提取一个子字符串。&n
我想知道是否可以在我的 Cost Explorer 中发现这个成本背后的资源,按使用类型分组我可以看到它是数据处理字节,但我不知道哪个资源会消耗这个数量数据的。知道如何在 CloudWatch 上发现
我有一个 Json 字符串。我想从该 Json 字符串中获取值。 这是我的 json 字符串{“纬度”:“22.5712854”},{“经度”:“88.4266847”} 我只需要使用 TSQL 查询
我是一名优秀的程序员,十分优秀!