- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我创建了一个 TPL 数据流管道,它由 3 个 TransformBlock 和最后一个 ActionBlock 组成。
var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database
var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));
loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);
validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);
importJob.LinkTo(reportImport);
每个 block 都会用处理后的数据填充作业对象,因为我不仅需要数据本身,还需要一般信息,我需要这些信息来响应消息。如果一切顺利,我基本上会添加一个 XML 路径并获得一个包含信息的响应对象。
如果有两个或更多文件需要一些时间从 HDD 读取,我该如何实现,它会并行和异步读取两个文件,同时保持它们进来的顺序?如果文件 1 需要更多时间,文件 2 需要等待文件 1 完成,然后我将数据传递到下一个 block ,然后它也将开始并行和异步验证数据,但这里也保持下一个 block 的顺序?
现在看起来即使我将 SendAsync 调用到 headblock,它也会按顺序处理所有文件。
编辑: 所以我为了管道目的编写了一个小测试类。它有3个阶段。我想要实现的是第一个在文件进入时继续读取文件的 TransformBlock(来自 FileSystemWatcher 的 SendAsync)并在完成后按文件进入的顺序输出它。意味着如果 File1 是一个大文件并且 File2+3 进来,两者将被读入,而 File1 仍在处理中,但 File2+3 必须等到它可以发送到第二个 TransformBlock,因为 File1 仍在被读入。Stage2 应该工作相同。另一方面,Stage3 需要获取从 File1 生成的对象并保存到数据库中,这可以并行和异步完成。但是,file1 中的对象需要在 file2 和 file3 之前处理。因此,整个文件内容需要按顺序处理,以便它们进入。我尝试通过使用 MaxDegreeOfParallelism
限制第三个 TransformBlock 来做到这一点。和 BoundedCapacity
都设置为 1,但这似乎失败了,并没有真正保持 Console.WriteLine 中的顺序
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;
namespace OrderProcessing
{
public class Job
{
public string Path { get; set; }
public XmlDocument Document { get; set; }
public List<Object> BusinessObjects { get; set; }
public int ReturnCode { get; set; }
public int ID { get; set; }
}
public class Test
{
ITargetBlock<Job> pathBlock = null;
CancellationTokenSource cancellationTokenSource;
Random rnd = new Random();
private bool ReadDocument(Job job)
{
Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
Task.Delay(rnd.Next(1000, 3000)).Wait();
// Throw OperationCanceledException if cancellation is requested.
cancellationTokenSource.Token.ThrowIfCancellationRequested();
// Read the document
job.Document = new XmlDocument();
// Some checking
return true;
}
private bool ValidateXml(Job job)
{
Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
Task.Delay(rnd.Next(1000, 3000)).Wait();
// Throw OperationCanceledException if cancellation is requested.
cancellationTokenSource.Token.ThrowIfCancellationRequested();
// Check XML against XSD and perform remaining checks
job.BusinessObjects = new List<object>();
// Just for tests
job.BusinessObjects.Add(new object());
job.BusinessObjects.Add(new object());
// Parse Xml and create business objects
return true;
}
private bool ProcessJob(Job job)
{
Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
// Throw OperationCanceledException if cancellation is requested.
cancellationTokenSource.Token.ThrowIfCancellationRequested();
Parallel.ForEach(job.BusinessObjects, bO =>
{
ImportObject(bO);
});
// Import the job
return true;
}
private object ImportObject(object o)
{
Task.Delay(rnd.Next(1000, 3000)).Wait();
return new object();
}
private void CreateResponse(Job job)
{
if(job.ReturnCode == 100)
{
Console.WriteLine("ID {0} was successfully imported.", job.ID);
}
else
{
Console.WriteLine("ID {0} failed to import.", job.ID);
}
// Create response XML with returncodes
}
ITargetBlock<Job> CreateJobProcessingPipeline()
{
var loadXml = new TransformBlock<Job, Job>(job =>
{
try
{
if(ReadDocument(job))
{
// For later error handling
job.ReturnCode = 100; // success
}
else
{
job.ReturnCode = 200;
}
return job;
}
catch(OperationCanceledException)
{
job.ReturnCode = 300;
return job;
}
}, TransformBlockOptions());
var validateXml = new TransformBlock<Job, Job>(job =>
{
try
{
if(ValidateXml(job))
{
// For later error handling
job.ReturnCode = 100;
}
else
{
job.ReturnCode = 200;
}
return job;
}
catch(OperationCanceledException)
{
job.ReturnCode = 300;
return job;
}
}, TransformBlockOptions());
var importJob = new TransformBlock<Job, Job>(job =>
{
try
{
if(ProcessJob(job))
{
// For later error handling
job.ReturnCode = 100; // success
}
else
{
job.ReturnCode = 200;
}
return job;
}
catch(OperationCanceledException)
{
job.ReturnCode = 300;
return job;
}
}, ActionBlockOptions());
var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
//
// Connect the pipeline
//
loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);
validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateXml.LinkTo(validationFailed);
importJob.LinkTo(reportImport);
// Return the head of the network.
return loadXml;
}
public void Start()
{
cancellationTokenSource = new CancellationTokenSource();
pathBlock = CreateJobProcessingPipeline();
}
public async void AddJob(string path, int id)
{
Job j = new Job();
j.Path = path;
j.ID = id;
await pathBlock.SendAsync(j);
}
static ExecutionDataflowBlockOptions TransformBlockOptions()
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 8,
BoundedCapacity = 32
};
}
private static ExecutionDataflowBlockOptions ActionBlockOptions()
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1
};
}
public void Cancel()
{
if(cancellationTokenSource != null)
cancellationTokenSource.Cancel();
}
}
class Program
{
private static String InputXml = @"C:\XML\Part.xml";
private static Test _Pipeline;
static void Main(string[] args)
{
_Pipeline = new Test();
_Pipeline.Start();
var data = Enumerable.Range(1, 100);
foreach(var d in data)
_Pipeline.AddJob(InputXml, d);
//Wait before closing the application so we can see the results.
Console.ReadLine();
}
}
}
EDIT2:在我通过设置 BoundedCapacity
进行更改后对于 Unbounded,我按照发送到管道中的顺序得到了所有内容。所以它之前并没有真正出问题,但我猜消息被丢弃了?
如果我确定 EnsureOrdered
和使用 MaxDegreeOfParallelism
一样正确最后 8 个 TransformBlock
,如果您检查下面的输出部分,则项目不再有序。但这是它需要按顺序排列的地方,因为我将数据保存到数据库中,这需要按照它进来的顺序。如果它在离开最后一个时不按顺序排列并不重要 TransformBlock
,所以我想我不能在这里保持并行性?
ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14
编辑 3:使用@JSteward 最新代码后的输出。
ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100
最佳答案
如果您将 TransformBlock
链接到 ActionBlock
,则可以执行此操作。
这是最容易用可编译的控制台应用程序来演示的。
此应用程序处理一系列整数,但您可以将这些整数替换为自定义工作单元类。
(我从我编写的一个实用程序修改了这段代码,它使用相对较慢的 LZMA 压缩算法进行多线程文件压缩。这个实用程序必须从文件中顺序读取输入数据,然后将它以 block 的形式传递给队列使用多个线程以任意顺序处理数据,最后将压缩 block 输出到一个队列,该队列必须保留数据 block 的原始顺序。)
示例代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
class Program
{
public static void Main()
{
var data = Enumerable.Range(1, 100);
var task = Process(data);
Console.WriteLine("Waiting for task to complete");
task.Wait();
Console.WriteLine("Task complete.");
}
public static async Task Process(IEnumerable<int> data)
{
var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions());
var writer = new ActionBlock<int>(block => write(block), actionBlockOptions());
queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });
await enqueDataToProcessAndAwaitCompletion(data, queue);
await writer.Completion;
}
static int process(int block)
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
emulateWorkload();
return -block;
}
static void write(int block)
{
Console.WriteLine("Output: " + block);
}
static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue)
{
await enqueueDataToProcess(data, queue);
queue.Complete();
}
static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue)
{
foreach (var item in data)
await queue.SendAsync(item);
}
static ExecutionDataflowBlockOptions transformBlockOptions()
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 8,
BoundedCapacity = 32
};
}
private static ExecutionDataflowBlockOptions actionBlockOptions()
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1
};
}
static Random rng = new Random();
static object locker = new object();
static void emulateWorkload()
{
int delay;
lock (locker)
{
delay = rng.Next(250, 750);
}
Thread.Sleep(delay);
}
}
}
输出:
Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .
注意“ block ”如何被多个线程以任意顺序处理,但输出顺序与输入顺序相同。
根据 actionBlockOptions()
方法设置输出 Action block 选项非常重要,MaxDegreeOfParallelism
和 BoundedCapacity
都设置为1.
这就是导致输出以正确顺序序列化的原因。如果将输出的 BoundedCapacity
和 MaxDegreeOfParallelism
设置为大于 1,则可能会以错误的顺序输出。
关于c# - TPL 数据流 - 并行和异步处理,同时保持顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41633764/
我有以下 TPL 数据流,当使用谓词过滤从 TransformBlock 传递到 ActionBlock 的项目时,它永远不会完成。 如果谓词对任何项目返回 false,则数据流挂起。 请有人提供一些
我是 smarty 的新手,所以我不确定这是否会导致我遗漏某些内容,但目前我正在尝试从 css 文件中提取一个类。 到目前为止,我已经设置了 2 个类 mainbackground 和 body,ma
如何强制 TPL 使用固定数量的线程?我知道 MaxDegreeOfParallelism 可用于设置上限,但我希望上限等于下限。这可能吗?怎么办? 因为我知道有人会问 =) 是的,我确定我想这样做,
我正在尝试使用 GXT 3.0 的 XTemplates(类似于 EXT),这里有 2 个具有以下关系的简单 java 对象: class A { String name; public
我刚刚将 Visual Studio 11 Beta 升级到新的 Visual Studio 2012 RC,并且在引用 TPL 数据流时遇到了问题。 首先,我尝试像以前一样通过从框架中添加引用来引用
我需要制作可扩展的流程。该进程主要有 I/O 操作和一些次要的 CPU 操作(主要是反序列化字符串)。该过程在数据库中查询 url 列表,然后从这些 url 中获取数据,将下载的数据反序列化为对象,然
我们有一个 TPL 数据流管道,其中包含以下 block : 变换 block A:Http post call 转换 block B:数据库 IO Transform Block C:一些单位转换数
我有一个 BufferBlock 来发布消息: public class DelimitedFileBlock : ISourceBlock { private ISourceBlock _s
我想在 Windows Azure 上的工作进程中使用 TPL。我希望在队列中添加一个 IJob,它有一个 Run 方法,因此工作线程将包括: 循环 将项目从队列中取出 使用TPL调用IJob.Run
我尝试创 build 计良好的 TPL 数据流管道,以优化系统资源的使用。我的项目是一个 HTML 解析器,它将解析后的值添加到 SQL Server DB 中。我已经有了 future 管道的所有方
我想为特定的内容类型覆盖 page.tpl.php。 我已经尝试过这些东西,对我没有任何作用。 page--article.tpl.php page--node--article.tpl.php pa
我已经完成了这个 POC 并验证了当你创建 4 个线程并在四核机器上运行它们时,所有的核心都会变得忙碌——所以,CLR 已经在不同的核心上有效地调度了线程,那么为什么要使用 TASK 类呢? 我同意
使用Visual Studio Concurrency Visualizer我现在明白为什么切换到 Parallel.For 没有任何好处:只有 9% 的时间机器忙于执行代码,其余的时间为 71% 的
我的代码中有以下使用 TPL 的设置: 我的类中的一个字段:private CancellationTokenSource _cancellationTokenSource; 每次我创建使用特定取消
我有一个 Windows 服务,它在经过漫长的过程后发送电子邮件。每当有表条目并处理它并将其发送出去时,该服务就会继续从数据库表中获取电子邮件数据。 目前它是一个多线程应用程序,我们在生产服务器中将线
刚刚使用 TPL DataFlow 编写了示例生产者消费者模式。我在这里有一些基本问题。 只有在生产者发布所有项目后,消费者才处于事件状态。异步是指生产任务和消费任务都可以并行运行。 给消费者一个 s
我正在使用 TPL,需要有一个长时间运行的 TPL 任务将结果发送到父 UI 线程而不终止。我已经尝试了几种方法,并且已经在谷歌上搜索了很多。有谁知道如何通过 TPL 实现这一点? 最佳答案 您可以传
我有一个以这种方式设置的 TPL 数据流: 下载字节数组 处理数据 将处理后的数据流式传输到另一个位置 此流程运行良好,但偶尔会在下载文件时遇到备份、连接问题等。我想做的是并行下载,但仍确保执行第 3
我有一个应该批量调用并压缩大文件的控制台应用程序,我想使用 DataFlow,除了完成之外一切正常 请考虑以下代码 public static void CompressFiles(string fo
当你生成多个任务时,像这样: for (int i = 0; i ((stateObject) => { tls.Value = (int)stateObject;
我是一名优秀的程序员,十分优秀!