gpt4 book ai didi

c# - 如何安排 TPL 数据流中的流控制?

转载 作者:行者123 更新时间:2023-11-30 15:31:21 30 4
gpt4 key购买 nike

我正在尝试着手控制 TPL 数据流中的数据流。我有一个非常快的生产者和一个非常慢的消费者。 (我的真实代码更复杂,但无论如何,这是一个非常好的模型,它重现了问题。)

当我运行它时,代码开始像过时一样消耗内存——生产者的输出队列尽可能快地填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会请求它。根据我对文档的阅读,这是应该发生的情况:也就是说,我认为生产者会等到消费者有空间。

显然,情况并非如此。我该如何解决才能使队列不至于乱成一团?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
class Program
{

static void Main(string[] args)
{
var CreateData = new TransformManyBlock<int, string>(ignore =>
{
return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
});

var ParseFile = new TransformManyBlock<string, string>(fileContent =>
{
Thread.Sleep(1000);
return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
);

var EndOfTheLine = new ActionBlock<object>(f =>
{
});


var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
CreateData.LinkTo(ParseFile, linkOptions);
ParseFile.LinkTo(EndOfTheLine, linkOptions);

Task t = new Task(() =>
{
while (true)
{
Console.WriteLine("CreateData: " + Report(CreateData));
Console.WriteLine("ParseData: " + Report(ParseFile));
Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount );
Thread.Sleep(1000);
}

});
t.Start();

CreateData.SendAsync(0);
CreateData.Complete();

EndOfTheLine.Completion.Wait();
}

public static string Report<T, U>(TransformManyBlock<T, U> block)
{
return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
}


}
}

最佳答案

通常,在这种情况下,您会做的是同时设置 CreateData block 的 BoundedCapacity。但这在这里行不通,因为在从单个 IEnumerable 填充输出队列时,TransformManyBlock 似乎没有考虑 BoundedCapacity

您可以改为创建一个函数来迭代集合并使用 SendAsync() 仅当目标可以接受它们时才发送更多数据:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
///
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
this ITargetBlock<T> target, IEnumerable<T> data)
{
try
{
foreach (var item in data)
{
await target.SendAsync(item);
}
}
catch (Exception e)
{
target.Fault(e);
}
}

用法:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

如果您仍然希望 CreateData block 的行为与您的原始代码类似,您可以有两个有界的 BufferBlockSendAllAsync() 然后使用 Encapsulate() 使它们看起来像一个 block :

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
CreateBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
var input = new BufferBlock<TInput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
var output = new BufferBlock<TOutput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

Task.Run(
async () =>
{
try
{
while (await input.OutputAvailableAsync())
{
var data = transform(await input.ReceiveAsync());

await output.SendAllAsync(data);
}

output.Complete();
}
catch (Exception e)
{
((IDataflowBlock)input).Fault(e);
((IDataflowBlock)output).Fault(e);
}
});

return DataflowBlock.Encapsulate(input, output);
}

关于c# - 如何安排 TPL 数据流中的流控制?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20715816/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com