- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
简单整理一下paralel,以上是并行的意思.
我们在工作中常常使用task await 和 async,也就是将线程池进行了封装,那么还有一些更高级的应用.
是对task的封装,那么来看下paralel.
static void Main(string[] args)
{
var ints= Enumerable.Range(1, 100);
var result = Parallel.ForEach(ints, arg =>
{
Console.WriteLine(arg);
});
Console.Read();
}
可以看到结果是并行的.
那么来看下实现机制.
public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (body == null)
{
throw new ArgumentNullException(nameof(body));
}
return ForEachWorker<TSource, object>(
source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}
进行参数检验,然后交给了ForEachWorker.
这是一个基本的代码思路,就是复杂的方法中可以先校验参数,然后具体实现交给另外一个方法.
然后通过不同的类型,进行分类
然后看下具体实现是什么?
进去看就是一个taskreplicator
看下run在做什么.
public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
{
int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;
TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();
Replica nextReplica;
while (replicator._pendingReplicas.TryDequeue(out nextReplica))
nextReplica.Wait();
if (replicator._exceptions != null)
throw new AggregateException(replicator._exceptions);
}
创建了一个taskreplictor,起到管理作用 。
然后创建了一个Replica,然后这个start 是关键 。
然后通过while,让每一个Replica 都运行完毕才推出,达到同步的效果 。
if (replicator._exceptions != null)
throw new AggregateException(replicator._exceptions);
可以看一下这个,这个是一个比较好的技巧。如果一个运行管理,不用抛出异常,之间在管理中进行运行处理总结.
比如结果,异常等.
那么就看下这个start.
protected Replica(TaskReplicator replicator, int maxConcurrency, int timeout)
{
_replicator = replicator;
_timeout = timeout;
_remainingConcurrency = maxConcurrency - 1;
_pendingTask = new Task(s => ((Replica)s).Execute(), this);
_replicator._pendingReplicas.Enqueue(this);
}
public void Start()
{
_pendingTask.RunSynchronously(_replicator._scheduler);
}
将会运行Execute,是同步的,而不是异步的,也就是说第一个task将会运行在当前线程.
那么看Execute在做什么?
public void Execute()
{
try
{
if (!_replicator._stopReplicating && _remainingConcurrency > 0)
{
CreateNewReplica();
_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
}
bool userActionYieldedBeforeCompletion;
ExecuteAction(out userActionYieldedBeforeCompletion);
if (userActionYieldedBeforeCompletion)
{
_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
_pendingTask.Start(_replicator._scheduler);
}
else
{
_replicator._stopReplicating = true;
_pendingTask = null;
}
}
catch (Exception ex)
{
LazyInitializer.EnsureInitialized(ref _replicator._exceptions).Enqueue(ex);
if (_replicator._stopOnFirstFailure)
_replicator._stopReplicating = true;
_pendingTask = null;
}
}
一段一段分析:
if (!_replicator._stopReplicating && _remainingConcurrency > 0)
{
CreateNewReplica();
_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
}
这里当_replicator 也就是任务复制器没有停止的时候。这里有两种情况会停止,一种是任务完成,一种是任务异常且设置参数异常时候停止.
_remainingConcurrency 指的是副本数,默认是int.max.
那么就复制一个副本.
protected override void CreateNewReplica()
{
Replica<TState> newReplica = new Replica<TState>(_replicator, _remainingConcurrency, GenerateCooperativeMultitaskingTaskTimeout(), _action);
newReplica._pendingTask.Start(_replicator._scheduler);
}
复制完副本后,那么就开始运行我们的action了.
protected override void ExecuteAction(out bool yieldedBeforeCompletion)
{
_action(ref _state, _timeout, out yieldedBeforeCompletion);
}
这里传入了timeout,这个timeout并不是我们限制我们单个task的运行时间,而是当运行到一定时候后,这个task就停止运行,然后另外启动一个副本.
if (CheckTimeoutReached(loopTimeout))
{
replicationDelegateYieldedBeforeCompletion = true;
break;
}
if (userActionYieldedBeforeCompletion)
{
_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
_pendingTask.Start(_replicator._scheduler);
}
else
{
_replicator._stopReplicating = true;
_pendingTask = null;
}
这个是为了符合操作系统的调度思想,跑的越久的,基本上优先级会低些.
那么看下这个_action主要在做什么吧.
while (myPartition.MoveNext())
{
KeyValuePair<long, TSource> kvp = myPartition.Current;
long index = kvp.Key;
TSource value = kvp.Value;
// Update our iteration index
if (state != null) state.CurrentIteration = index;
if (simpleBody != null)
simpleBody(value);
else if (bodyWithState != null)
bodyWithState(value, state);
else if (bodyWithStateAndIndex != null)
bodyWithStateAndIndex(value, state, index);
else if (bodyWithStateAndLocal != null)
localValue = bodyWithStateAndLocal(value, state, localValue);
else
localValue = bodyWithEverything(value, state, index, localValue);
if (sharedPStateFlags.ShouldExitLoop(index)) break;
// Cooperative multitasking:
// Check if allowed loop time is exceeded, if so save current state and return.
// The task replicator will queue up a replacement task. Note that we don't do this on the root task.
if (CheckTimeoutReached(loopTimeout))
{
replicationDelegateYieldedBeforeCompletion = true;
break;
}
}
就是拉取我们的enumerator的数据,然后simpleBody(value),进行运行我们写的action.
总结一下,其实Parallel 核心就是一个任务复制器,然后创建多个副本,拉取我们的数据,进行执行我们设置的action.
里面的主要功能,Parallel做到了限制副本数,因为我们知道task并不是越多越好.
第二个,如果长时间运行,那么Parallel是做了优化的,当达到timeout的时候,那么会重新启动一个副本(可以理解为一个线程) 。
第三点,Parallel 有一个foreach 进行迭代器的处理,这里不仅仅是让任务可以并行.
而且具备c# foreach的基本功能.
static void Main(string[] args)
{
var ints= Enumerable.Range(1, 100);
var result = Parallel.ForEach(ints, (arg, state)
=>
{
if (state.IsStopped)
{
return;
}
if (arg > 18)
{
state.Break();
}
});
if (result.IsCompleted)
{
Console.WriteLine("完成");
}
Console.Read();
}
可以进行中断.
还有一个函数,那就是stop,这个stop 比break 停止的快,break 要记录出,最小中断位置.
而stop 就是立马停止下来.
在上述中,我们知道可以传递一个taskschedule进行,那么这个taskschedule 是干什么的,对我们的任务调度有什么影响呢? 下一节,自我实现taskschedule.
最后此篇关于c#异步进阶————paralel[二]的文章就讲到这里了,如果你想了解更多关于c#异步进阶————paralel[二]的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
MySQL表的增删改查(进阶) 1. 数据库约束 约束类型 说明 示例 NULL约束 使用NOT NULL指定列不为空 name varchar(20) not null, UNIQUE唯一约束 指定
多线程(进阶) 1. 常见的锁策略 1.1 乐观锁 悲观锁 乐观锁 : 总是假设最好的情况,每次去拿数据的时候都认为别人不会修改数据,但是在对数据提交更新的时候,再去判断这个数据在这个期间是否有别人对
我相信在正确编码的系统中-错误(作为错误或异常)应该是不可能的(DB/memcached服务器故障导致查询失败)。我们的代码不应依赖任何假设才能正常工作,并且应尽可能地证明其正确性。 但是,为了确保我
1. 前言 泛型代码让你能根据你所定义的要求写出可以用于任何类型的灵活的、可复用的函数。你可以编写出可复用、意图表达清晰、抽象的代码。 泛型是 Swift 最强大
一、创建质量配置及关联项目 1.新建一个java代码质量配置 2.为配置添加规则 确认有4条规则了 为项目更换扫描配置 二、创建质量阈关联项目 1.
完整jenkinsfile 、sharelibrary 及jenkins配置见最后 一、gitlab push分支自动匹配 1.添加Generic Webhook插件参数,获取本次提交的分支信息
1.gitlab创建新应用 2.jenkins安装gitlab插件 3.插件安装完成后全局安全配置中使用并配置gitlab认证 4.注销重新登录后自动使用gitlab当前登录
一、部署jenkins master 1.创建Deployment YAML文件 apiVersion: apps/v1 kind: Deployment metadata: name: je
一、docker安装nexus wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo yum clean all
一、新建library文件 build.groovy package org.devops // 构建类型 def Build(buildType,buildShell){
一、制品获取 1.安装及配置插件 配置插件(jenkins项目中) 2.选择对应的制品 3.修改jenkins file // 新增以下代码 String artifactU
1.github创建OAuth 2.jenkins安装并配置github认证插件 jenkins配置使用github认证 3.注销重新登录
一、添加测试Maven项目 1.新建一个gitlab项目 2.导入simple-java-maven-app仓库代码(可以去github或者Gittree上都有) 3.配置mvn 国内源
一、添加AnsiColor插件 二、查看插件语法 1.打开任意pipline项目配置,找到流水线语法,并点击 跳转连接,选择插件,查看帮助 三、修改sharelibrary脚本,优
一、Pipeline概念 1 node/agent(节点) 节点是一个机器,可以是Jenkins的master节点也可以是slave节点。通过node指定当前job运行的机器(这个是脚本式语法)。
一、插件备份和恢复 1.安装备份插件 重启系统后查看 2.配置周期备份 点击进入,点击Settings Backup only builds marked to keep
一、.部署LDAP 这里使用容器部署,手动部署参考:https://www.cnblogs.com/panwenbin-logs/p/16101045.html 1.安装docker wget -
由于sonarqube开源版本不支持多分支管理,在扫描所有分支的时候都会指定同一个sonar项目,不便于我们查看 一、下载开源插件 项目地址:https://github.com/mc1arke/
一、手动测试 注意此版本已经内置包含Java语言扫描插件,不再需要单独安装 1.clone代码 git clone git@192.168.1.128:root/demo-maven-serv
我有下一种情况。 从 PHP 表单中我只获得公司 ID 我需要使用该公司 ID 排列所有用户名 我需要数组并将具有该用户名的所有日志导出到表 我的问题是,当我尝试下一步时: $sql2 = "SELE
我是一名优秀的程序员,十分优秀!