gpt4 book ai didi

task-parallel-library - TPL DataFlow-按持续时间或阈值进行批处理

转载 作者:行者123 更新时间:2023-12-04 08:24:33 26 4
gpt4 key购买 nike

我已经使用TPL数据流实现了producer..consumer模式。用例是代码从Kafka总线读取消息。为了提高效率,我们在进入数据库时​​需要分批处理消息。

TPL数据流中是否有方法可以保留消息并在达到大小或持续时间阈值时触发?

例如,当前实现将消息从队列中拉出后就将其发布。

    postedSuccessfully = targetBuffer.Post(msg.Value);

最佳答案

按计数和持续时间进行缓冲已经可以通过System.Reactive使用,特别是Buffer运算符。缓冲区会收集传入的事件,直到达到所需的计数或其时间间隔到期为止。

数据流块旨在与System.Reactive一起使用。使用can be convertedDataflowBlock.AsObservable()扩展方法将AsObserver()阻止到Observable和Observers。

这使得构建缓冲块非常容易:

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();

var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);

return DataflowBlock.Encapsulate(inBlock, outBlock);

}

该方法使用两个缓冲块来缓冲输入和输出。当批处理已满或时间跨度到期时, Buffer()从输入块(可观察的)读取并写入输出块(观察器)。

默认情况下,Rx在当前线程上运行。通过调用 ObserveOn(TaskPoolScheduler.Default),我们告诉它处理任务池线程上的数据。

示例

此代码为5个项目或1秒创建一个缓冲块。首先发布7个项目,等待1.1秒,然后发布另外7个项目。每个批处理都与线程ID一起写入控制台:
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);

//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

输出为:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6

关于task-parallel-library - TPL DataFlow-按持续时间或阈值进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52633346/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com