gpt4 book ai didi

system.reactive - 根据 CPU 使用率限制 RX 任务

转载 作者:行者123 更新时间:2023-12-02 01:46:47 27 4
gpt4 key购买 nike

我有一个长时间运行的任务(从 Kinect One 的深度图像创建纹理)是使用 Reactive Extensions 实现的。它的要点如下:

kinectWrapper.DepthFrames
.ObserveOn(new EventLoopScheduler())
.Select(f => do some CPU intensive data manipulation to create the color texture I want)
.Subscribe(colorFrame => fill texture on GPU)

问题是 select 和 subscribe 对系统的负担都很大,不会全速运行。我已经使用 .Sample(TimeSpan.FromMilliseconds(100)) 设法让它在我的开发 PC 上以可接受的速度运行,但我宁愿让它根据 CPU 使用率降低帧率。

我觉得有两种可能:

  1. 创建一个辅助 IObservable 作为 Sample 的输入,动态调节主事件循环。
  2. 编写我自己的 IScheduler,当它被任务淹没时,它会删除计划任务。

最佳答案

可以通过修改此处找到的扩展方法的行为来实现解决方案:http://rxx.codeplex.com/workitem/20724

下面是一个例子。在这种情况下,我修改了行为,以便扩展方法通过丢弃最旧的通知来限制排队通知的数量,直到队列大小可以接受为止。

为了满足您的要求,您可以修改它,以便它根据您可以使用 System.Diagnostics.PerformanceCounter 读取的 CPU 指标丢弃某些通知。类(class)。

但是,您也可以尝试将自己从这些特定细节中抽象出来,也许您可​​以将下面的扩展方法与使用低优先级线程的调度程序一起使用。

这意味着当 CPU 繁忙时,通知更有可能被丢弃。

kinectWrapper.DepthFrames.ThrottledObserveOn(
new EventLoopScheduler(start => new Thread(start) {Priority = ThreadPriority.Lowest, IsBackground = true}),
5).Select(...

public static IObservable<TSource> ThrottledObserveOn<TSource>(
this IObservable<TSource> source,
IScheduler scheduler,
int maximumQueuedNotifications)
{
Contract.Requires(source != null);
Contract.Requires(scheduler != null);
Contract.Requires(maximumQueuedNotifications >= 0);

return Observable.Create<TSource>(observer =>
{
var notificationsGate = new object();
var acceptingNotification = false;
var nextNotifications = new Queue<Notification<TSource>>();
Notification<TSource> completionNotification = null;
var schedulerDisposable = new MultipleAssignmentDisposable();

var subscriptionDisposable = source.Materialize().Subscribe(notification =>
{
bool startAcceptingNotifications;

lock (notificationsGate)
{
startAcceptingNotifications = !acceptingNotification;
acceptingNotification = true;

if (notification.Kind == NotificationKind.OnNext)
{
nextNotifications.Enqueue(notification);
}
else
{
completionNotification = notification;
}
}

if (startAcceptingNotifications)
{
schedulerDisposable.Disposable = scheduler.Schedule(rescheduleAction =>
{
Notification<TSource> notificationToAccept;
lock (notificationsGate)
{
if (nextNotifications.Any())
{
do
{
notificationToAccept = nextNotifications.Dequeue();
}
while (nextNotifications.Count > maximumQueuedNotifications);
}
else
{
notificationToAccept = completionNotification;
completionNotification = null;
}
}

notificationToAccept.Accept(observer);

bool continueAcceptingNotification;

lock (notificationsGate)
{
continueAcceptingNotification = acceptingNotification = nextNotifications.Any() || completionNotification != null;
}

if (continueAcceptingNotification)
{
rescheduleAction();
}
});
}
});
return new CompositeDisposable(subscriptionDisposable, schedulerDisposable);
});
}

关于system.reactive - 根据 CPU 使用率限制 RX 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25179065/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com