gpt4 book ai didi

c# - 带有 CancellationTokenSource 的 channel 在处理后超时内存泄漏

转载 作者:行者123 更新时间:2023-12-04 07:39:23 27 4
gpt4 key购买 nike

完整的可重现代码是 on github ,启动可执行文件后,内存将很快飙升。代码主要位于 AsyncBlockingQueue.cs类(class)。
以下代码实现了一个简单的异步“阻塞”队列:

        public async Task<T> DequeueAsync(
int timeoutInMs = -1,
CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectDisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue failed.");
throw;
}
}


private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,
CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}

CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
这样使用的时候,会出现内存泄漏:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}
enter image description here

最佳答案

更新
从评论:

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up


这意味着真正需要的是一种通过计数和周期来批量处理消息的方法。做任何一个都相对容易。
此方法按计数进行批处理。该方法将消息添加到 batch列表直到达到限制,向下游发送数据并清除列表:
static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;

_ = Task.Run(async ()=>{
var batch=new List<Message>(count);
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
if(batch.Count==count)
{
await writer.WriteAsync(batch.ToArray());
batch.Clear();
}
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
按周期分批的方法更复杂,因为定时器可以在收到消息的同时触发。 Interlocked.Exchange替换现有的 batch列出一个新的并将批量数据发送到下游。 :
static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;

var batch=new List<Message>();
Timer t=new Timer(async obj =>{
var data=Interlocked.Exchange(ref batch,new List<Message>());
writer.WriteAsync(data.ToArray());
},null,TimeSpan.Zero,period);

_ = Task.Run(async ()=>{

await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
}
},token)
.ContinueWith(t=>{
timer.Dispose();
writer.TryComplete(t.Exception);
});
return channel;
}
两者都做 - 我仍在努力。问题是计数和计时器到期可以同时发生。最坏情况, lock(batch)可用于确保只有线程或循环才能向下游发送数据
原答案
正确使用时 channel 不会泄漏 - 就像任何其他容器一样。 Channel 不是异步队列,也绝对不是阻塞队列。这是一个非常不同的结构,具有完全不同的习语。它是一个使用队列的更高级别的容器。有一个很好的理由有单独的 ChannelReader 和 ChannelWriter 类。
典型的场景是让发布者创建并拥有 channel 。只有发布者可以写入该 channel 并调用 Complete()在上面。 Channel不实现 IDisposable所以无法处置。出版商只提供 ChannelReader给订阅者。
订阅者只能看到 ChannelReader并从中读取直到完成。通过使用 ReadAllAsync订阅者可以继续从 ChannelReader 读取,直到它完成。
这是一个典型的例子:
ChannelReader<Message> Producer(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;

//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<100;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
//Simulate some work
await Task.Delay(100);
await writer.WriteAsync(new Message(...));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));

//This casts to a ChannelReader
return channel;
}
订阅者只需要一个 ChannelReader上类。通过使用 ChannelReader.ReadAllAsync订阅者只需要 await foreach处理消息:
async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
//Use the message
}
}
订阅者可以通过返回 ChannelReader 来生成自己的消息。这就是事情变得非常有趣的地方,如 Subscriber方法成为链式步骤管道中的一个步骤。如果我们将 ChannelReader 上的方法转换为扩展方法我们可以轻松创建整个管道。
让我们生成一些数字:
ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(10);
var writer=channel.Writer;

//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<nums;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}

await writer.WriteAsync(i*7);
await Task.Delay(100);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));

//This casts to a ChannelReader
return channel;
}
然后将它们加倍并平方:
ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;

//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(2.0*msg);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));

return channel;
}

ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;

//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(Math.Sqrt(msg));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));

return channel;
}
最后打印它们
async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
Console.WriteLine(msg);
}
}

现在我们可以建立一个管道

await Generate(100)
.Double()
.Square()
.Print();
并向所有步骤添加取消 token :
using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
.Double(cts.Token)
.Square(cts.Token)
.Print(cts.Token);
如果一个步骤产生消息的速度比它们长时间消耗的速度快,则内存使用量可能会增加。这可以通过使用有界 channel 而不是无界 channel 轻松处理。这样,如果一个方法太慢,所有以前的方法都必须在发布新数据之前等待。

关于c# - 带有 CancellationTokenSource 的 channel 在处理后超时内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67573683/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com