gpt4 book ai didi

.net - TPL 数据流生产者消费者模式

转载 作者:行者123 更新时间:2023-12-01 12:47:57 27 4
gpt4 key购买 nike

刚刚使用 TPL DataFlow 编写了示例生产者消费者模式。我在这里有一些基本问题。

  1. 只有在生产者发布所有项目后,消费者才处于事件状态。异步是指生产任务和消费任务都可以并行运行。

  2. 给消费者一个 sleep 时间来验证它是否阻止其他数据项。它似乎是按顺序执行的,没有任何并行性。

我是不是做错了什么?

class AscDataBlocks
{
public Int64 start;
public Int64 End;
//public string ThreadName;
public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform)
// This is using TPL DataBlock producer consumer pattern.
{
for (var i = start; i < End; i++)
{
Console.WriteLine("Postingasc : {0}", i);
targetAscTransform.Post(i);
}
}

public void ProcessDataBuffer(Int64 ascDataSet)
{
if (ascDataSet == 5)
// Testing if this will delay all the other data processing
Thread.Sleep(5000);
else
Thread.Sleep(500);
Console.WriteLine(ascDataSet);
}

// Demonstrates the consumption end of the producer and consumer pattern.
public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
{
// Initialize a counter to track the number of bytes that are processed.
int status = 0;

// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
Int64 data;
source.TryReceive(out data);
ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
// Increment the count of bytes received.
status = 1;
}
return status;
}
}

static void Main(string[] args)
{
AscDataBlocks ascb;
BufferBlock<Int64> ascbuffer;
System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
CancellationToken ct = new CancellationToken();
CancellationTokenSource cts = new CancellationTokenSource();
ascb = new AscDataBlocks();
ascb.start = 1;
ascb.End = 100;
try
{
ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});// Initiallize the buffer bloack
ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
//Post source data to the dataflow block.
ascb.AscBufferProducer(ascbuffer);
ascProcessingconsumer.Wait();
}
catch (Exception ex)
{
//foreach (var v in ex.InnerExceptions)
// Console.WriteLine("msg: " + v.Message);
}
}

最佳答案

The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.

发生这种情况是因为您在消费者有机会开始之前很快就发布了所有商品。如果您添加了 Thread.Sleep(100),您会发现它们实际上是并行工作的。

Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.

TPL 数据流并不神奇:它不会修改您的代码以并行执行。是您调用了一次 AscTransConsumerAsync(),所以它实际上只执行一次,请不要感到惊讶。

TDF 确实支持并行处理,但您需要实际让它执行处理代码。为此,请使用其中一个执行 block 。在你的情况下 ActionBlock似乎很合适。

如果使用它,则可以通过设置 MaxDegreeOfParallelism 将 block 配置为并行执行.当然,这样做意味着您需要确保处理委托(delegate)是线程安全的。

这样,AscTransConsumerAsync() 现在可能看起来像这样:

public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
// counter to track the number of items that are processed
Int64 count = 0;

var actionBlock = new ActionBlock<Int64>(
data =>
{
ProcessDataBuffer(data);
// count has to be accessed in a thread-safe manner
// be careful about using Interlocked,
// for more complicated computations, locking might be more appropriate
Interlocked.Increment(ref count);
},
// some small constant might be better than Unbounded, depedning on circumstances
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

// this assumes source will be completed when done,
// you need to call ascbuffer.Complete() after AscBufferProducer() for this
await actionBlock.Completion;

return count;
}

关于.net - TPL 数据流生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14255655/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com