gpt4 book ai didi

c# - 取消长时间运行的 Rx 流

转载 作者:行者123 更新时间:2023-11-30 16:20:08 24 4
gpt4 key购买 nike

如果我有一个长时间运行的流,比如:

inputStream.Select(n => Task.Run(() =>
{
// Long running operation
Thread.Sleep(TimeSpan.FromSeconds(5));

return n * n;
}).ToObservable())
.Switch()
.Subscribe(result =>
{
// Use result in some way
Console.WriteLine(result);
});

我如何在 Task.Run 调用中获取 CancellationToken,以便当 Switch 处理正在进行的计算的订阅时,它将 CancellationToken 设置为已取消所以我知道要中止计算。

最佳答案

您可以使用 Observable.StartAsync 方法,例如

inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
if (token.IsCancellationRequested)
{
// .. don't need to do anything
return 0;
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return n * n;

}
}))))
.Switch()
.Subscribe(Console.WriteLine);

或者,如果您要生成多个值,则可以使用与 Task 配合使用的 Observable.Create 重载来获取 CancellationToken。例如

inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
observer.OnNext(n * n);
}

observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);

在您的任务中,您需要调用 OnNext 来生成值。任务的返回值(如果有)将被忽略。

关于c# - 取消长时间运行的 Rx 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14625438/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com