gpt4 book ai didi

c# - 在冷 IObservable 上暂停和恢复订阅

转载 作者:可可西里 更新时间:2023-11-01 09:01:23 25 4
gpt4 key购买 nike

使用 Rx ,我希望在以下代码中使用暂停和恢复功能:

如何实现 Pause() 和 Resume() ?

    static IDisposable _subscription;

static void Main(string[] args)
{
Subscribe();
Thread.Sleep(500);
// Second value should not be shown after two seconds:
Pause();
Thread.Sleep(5000);
// Continue and show second value and beyond now:
Resume();
}

static void Subscribe()
{
var list = new List<int> { 1, 2, 3, 4, 5 };
var obs = list.ToObservable();
_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Thread.Sleep(2000);
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
}

static void Pause()
{
// Pseudocode:
//_subscription.Pause();
}

static void Resume()
{
// Pseudocode:
//_subscription.Resume();
}

Rx 解决方案?

  • 我相信我可以使用某种 bool 字段门控结合线程锁定(Monitor.WaitMonitor.Pulse)

  • 但是是否有 Rx 运算符或其他一些响应式简写来实现相同的目标?

最佳答案

这是一个相当简单的 Rx 方法来做你想做的事。我创建了一个名为 Pausable 的扩展方法,该方法接受一个源可观察对象和第二个可暂停或恢复可观察对象的 bool 可观察对象。

public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};

return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}

可以这样使用:

var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);

使用 PauseResume 方法将其放入代码中应该相当容易。

关于c# - 在冷 IObservable 上暂停和恢复订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7620182/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com