- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
完整的可重现代码是 on github ,启动可执行文件后,内存将很快飙升。代码主要位于 AsyncBlockingQueue.cs
类(class)。
以下代码实现了一个简单的异步“阻塞”队列:
public async Task<T> DequeueAsync(
int timeoutInMs = -1,
CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectDisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue failed.");
throw;
}
}
private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,
CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
这样使用的时候,会出现内存泄漏:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}
最佳答案
更新
从评论:
there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up
batch
列表直到达到限制,向下游发送数据并清除列表:
static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var batch=new List<Message>(count);
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
if(batch.Count==count)
{
await writer.WriteAsync(batch.ToArray());
batch.Clear();
}
}
},token)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
按周期分批的方法更复杂,因为定时器可以在收到消息的同时触发。
Interlocked.Exchange
替换现有的
batch
列出一个新的并将批量数据发送到下游。 :
static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
var batch=new List<Message>();
Timer t=new Timer(async obj =>{
var data=Interlocked.Exchange(ref batch,new List<Message>());
writer.WriteAsync(data.ToArray());
},null,TimeSpan.Zero,period);
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
batch.Add(msg);
}
},token)
.ContinueWith(t=>{
timer.Dispose();
writer.TryComplete(t.Exception);
});
return channel;
}
两者都做 - 我仍在努力。问题是计数和计时器到期可以同时发生。最坏情况,
lock(batch)
可用于确保只有线程或循环才能向下游发送数据
Complete()
在上面。
Channel
不实现
IDisposable
所以无法处置。出版商只提供
ChannelReader
给订阅者。
ChannelReader
并从中读取直到完成。通过使用
ReadAllAsync
订阅者可以继续从 ChannelReader 读取,直到它完成。
ChannelReader<Message> Producer(CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<Message>();
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<100;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
//Simulate some work
await Task.Delay(100);
await writer.WriteAsync(new Message(...));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
订阅者只需要一个
ChannelReader
上类。通过使用
ChannelReader.ReadAllAsync订阅者只需要
await foreach
处理消息:
async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
//Use the message
}
}
订阅者可以通过返回 ChannelReader 来生成自己的消息。这就是事情变得非常有趣的地方,如
Subscriber
方法成为链式步骤管道中的一个步骤。如果我们将
ChannelReader
上的方法转换为扩展方法我们可以轻松创建整个管道。
ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
var channel=Channel.CreateBounded<int>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
for(int i=0;i<nums;i++)
{
//Check for cancellation
if(token.IsCancellationRequested)
{
return;
}
await writer.WriteAsync(i*7);
await Task.Delay(100);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
//This casts to a ChannelReader
return channel;
}
然后将它们加倍并平方:
ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(2.0*msg);
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
var channel=Channel.CreateBounded<double>(10);
var writer=channel.Writer;
//Create the actual "publisher" worker
_ = Task.Run(async ()=>{
await foreach(var msg in input.ReadAllAsync(token))
{
await writer.WriteAsync(Math.Sqrt(msg));
}
} ,token)
//Complete and propagate any exceptions
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
最后打印它们
async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
await foreach(var msg in input.ReadAllAsync(token))
{
Console.WriteLine(msg);
}
}
现在我们可以建立一个管道
await Generate(100)
.Double()
.Square()
.Print();
并向所有步骤添加取消 token :
using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
.Double(cts.Token)
.Square(cts.Token)
.Print(cts.Token);
如果一个步骤产生消息的速度比它们长时间消耗的速度快,则内存使用量可能会增加。这可以通过使用有界 channel 而不是无界 channel 轻松处理。这样,如果一个方法太慢,所有以前的方法都必须在发布新数据之前等待。
关于c# - 带有 CancellationTokenSource 的 channel 在处理后超时内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67573683/
我很好奇为什么以下不起作用。一般select用default:防止死锁,但在这种情况下不是: package main import "fmt" func main () { a := mak
我一生都无法弄清楚如何切换图像排序。图像以 (x,x,3) 格式读取,theano 要求它是 (3,x,x) 格式。我尝试更改顺序numpy.array([img[:,:,i] for i in ra
我正在向 libnl 发送单个 SSID 和频率进行扫描,但我得到了多个扫描结果以及我请求的 SSID 和频率,但我需要单个扫描结果(仅适用于请求的 SSID),如何实现这一点。请帮助我,我也发送了我
我是 Golang 的新手,但正在努力理解这门伟大的语言!请帮帮我.. 我有 2 个 channel 。 “进”和“出” channel in, out := make(chan Work),
例如我有这段代码: package main import ( "fmt" ) func main() { c1 := make(chan interface{}) close
我们使用以下调用来获取经过身份验证的用户的 ChannelID,它适用于大多数情况。一些 YouTube 用户将他们的 channel 连接到 Google+ 信息页,但在这种情况下,我们的一位用户无
case 'sinfo': const sinfo = new Discord.MessageEmbed() .addField('Server Name 🔎 :', message.guild.n
我需要让所有 channel 来创建一个 bunker 命令,这使得所有 channel 都是只读的。 最佳答案 他们变了Client.servers至 Client.guilds在 newer ve
为什么当第二个值通过另一个 go routine 发送并且没有收到发送的第一个值时, channel c 没有缓冲? package main import "fmt" func sum(s []in
据我所知,内置的 split 会将一个 3 channel Mat 拆分为三个 1 channel Mat。结果,这三个 Mat 只是具有一些不同强度的灰度。 我的意图是获得三个 3 channel
如何检测当前的 RAM 配置?我需要询问 Windows RAM 当前是在单 channel 、双 channel 还是四 channel 中运行。 我搜索了很多,并没有在这个网站或其他网站上找到任何
我需要拆分一个多 channel wav 文件并将每个 channel 编码为 mp3 文件。 我知道 gtresamer 的 deinterleave 插件,但我不确定如何将它用于 wav 文件以及
关闭。这个问题需要details or clarity .它目前不接受答案。 想要改进这个问题吗? 通过 editing this post 添加详细信息并澄清问题. 关闭 8 年前。 Improve
我正在尝试运行 Hyperledger Fabric 网络,它由单个订购者、单个对等节点和一个 cli 组成。为了学习启动 Hyperledger Fabric 网络的过程,从创建与加密相关的工件到将
我在 Laravel 中使用事件广播。我正在使用基于角色的通知访问权限。我有用于广播的自定义 auth guard。当用户连接到 channel 时,客户端将具有内部权限的 access_token
我正在编写一个使用 Elixir Channels 来处理实时事件的应用程序。我知道每个客户端将打开 1 个套接字,并且可以在其上多路复用多个 channel 。所以我的应用程序是一个聊天应用程序,其
我有一些 .wav 文件,我想转换它们的频率 (fs) 和 channel 数 (nchannels)。我在jupyter笔记本python3.6上使用ffmpeg。我使用了以下命令并且它有效。 cm
我有一个视频渲染器,它需要两个 H265 流(YUV420),我需要烘焙它们以使它们中的一个与另一个形成 alpha 蒙版。这一切都已解决并且效果很好,但是如果我按照此处的说明进行操作: ffmpeg
我运行此命令以便能够将 udp 直播流传输到可使用正在构建的移动应用程序播放的 http 直播流。 它只是一个只有音频流的流。 ffmpeg -i udp://@localhost:1111 -map
我在我的 discord.js 机器人中创建了 nuke 命令,它创建了具有相同名称、权限、主题等的 channel ,并删除了“原始” channel 。但是有一个问题,如何使 channel 与“
我是一名优秀的程序员,十分优秀!