gpt4 book ai didi

c# - 在 RX 中有没有办法将源流绑定(bind)到目标流,以便可以在不影响客户端订阅的情况下更改源?

转载 作者:太空狗 更新时间:2023-10-30 01:18:57 26 4
gpt4 key购买 nike

我正在使用 RX,我想将源流绑定(bind)/映射到目标流,以便可以动态更改源流,而不会影响对目标流的任何订阅。

我将在这里布置我的(天真的)解决方案,希望有人能向我展示更好的解决方案。

我希望可以组合现有的扩展方法来实现此结果。如果没有,我希望制作一个自定义扩展方法来简化我的解决方案。

/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T>
{
/// <summary>
/// The source stream that is only set when we bind it.
/// </summary>
private IObservable<T> sourceStream;

/// <summary>
/// Used to unsubscribe from the source stream.
/// </summary>
private IDisposable sourceStreamDisposer;

/// <summary>
/// Subject used as the destination stream.
/// For passing data from source to dest stream.
/// </summary>
private Subject<T> destStream = new Subject<T>();

/// <summary>
/// Get the destination stream. Clients can subscribe to this to receive data that is passed on from the source stream.
/// Later on we can set or change the underlying source stream without affecting the destination stream.
/// </summary>
public IObservable<T> GetDestStream()
{
return destStream;
}

/// <summary>
/// Bind to a source stream that is to be propagated to the destination stream.
/// </summary>
public void Bind(IObservable<T> sourceStream)
{
Unbind();

this.sourceStream = sourceStream;
this.sourceStreamDisposer = sourceStream.Subscribe(dataItem =>
{
//
// Pass the source item on to the client via the subject.
//
destStream.OnNext(dataItem);
});
}

/// <summary>
/// Unsubscribe from the source stream.
/// </summary>
public void Unbind()
{
if (sourceStreamDisposer != null)
{
sourceStreamDisposer.Dispose();
}

sourceStreamDisposer = null;
sourceStream = null;
}

}

这是一个非常简单的例子,说明如何使用它:

static void Main(string[] args)
{
var bindableStream = new BindableStream<long>();

// Subscribe before binding the source stream.
bindableStream.GetDestStream().Subscribe(i => Console.WriteLine(i));

Thread.Sleep(1000);

// Bind a source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

Thread.Sleep(5000);

// Bind a new source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));

Console.ReadKey();
}

最佳答案

您可以使用 Observable.Switch(...) 运算符来获得您想要的内容。

Switch 创建“滚动”订阅。当产生一个新的 Observable 时,它​​会处理其对前一个 Observable 的订阅,并订阅新的 Observable。

static void Main(string[] args)
{
var streams = new Subject<IObservable<long>>();

// Subscribe before binding the source stream.
streams.Switch().Subscribe(Console.WriteLine);

Thread.Sleep(1000);

// Bind a source stream.
streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

Thread.Sleep(5000);

// Bind a new source stream.
streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

Console.ReadKey();
}

或者,如果您知道您的“流”来自哪里...

static void Main(string[] args)
{
var interval = Observable.IntervalTimeSpan.FromSeconds(1));

var sourcesOvertime = new [] {
// Yield the first source after one second
Observable.Return(interval).Delay(TimeSpan.FromSeconds(1)),
// Yield the second source after five seconds
Observable.Return(interval).Delay(TimeSpan.FromSeconds(5))
};

sourcesOvertime
// merge these together so we end up with a "stream" of our source observables
.Merge()
// Now only listen to the latest one.
.SwitchLatest()
// Feed the values from the latest source to the console.
.Subscribe(Console.WriteLine);

Console.ReadKey();
}

编辑:

作为 BindableStream 类的简化...

static void Main(string[] args)
{
// var bindableStream = new BindableStream<long>();
var bindableStream = new Subject<IObservable<long>>();
var dest = bindableStream.Switch();

// Subscribe before binding the source stream.
// bindableStream.Subscribe(i => Console.WriteLine(i));
dest.Subscribe(i => Console.WriteLine(i));

Thread.Sleep(1000);

// Bind a source stream.
// bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

Thread.Sleep(5000);

// Bind a new source stream.
// bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));

Thread.Sleep(4000);

Console.WriteLine("Unbound!");

// Unbind the source and dest streams.
// bindableStream.Unbind();
bindableStream.OnNext(Observable.Empty<long>());

Console.ReadKey();
}

或者如果那太冗长...

public static class SubjectEx
{
public static class OnNextEmpty<T>(this ISubject<IObservable<T>> subject)
{
subject.OnNext(Observable.Empty<T>());
}
}

关于c# - 在 RX 中有没有办法将源流绑定(bind)到目标流,以便可以在不影响客户端订阅的情况下更改源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25101108/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com