gpt4 book ai didi

system.reactive - 如何将 n 个 observable 动态组合成一个列表?

转载 作者:行者123 更新时间:2023-12-04 15:15:03 25 4
gpt4 key购买 nike

我有一组 observables 为所谓的 Channel 生成状态变化。 .我有一个 ChannelSet应该监控这些 channel 。

我想写这样的东西:如果一个 channel 是可操作的,则 channel 集已启动,否则, channel 集已关闭。

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
channelSet.ChannelSetState = ChannelSetState.Up;
else
channelSet.ChannelSetState = ChannelSetState.Down;

但是我从哪里得到我的 IEnumerable<ChannelState> ?如果我有 1 个 channel ,我可以简单地订阅它的状态更改并相应地修改 channel 集的状态。对于两个 channel ,我可以使用 CombineLatest :
Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
{
if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
return ChannelSetState.Up;
else
return ChannelSetState.Down;
});

但我有一个 IEnumerable<Channel>和相应的 IEnumerable<IObservable<ChannelState>> .我正在寻找类似 CombineLatest 的东西这不仅限于固定数量的可观察量。

更复杂的是,可以添加和删除 channel 集合。因此,偶尔会添加一个 channel ,例如。新 channel 还会生成需要合并的状态更改。

所以我真正要找的是一个函数:
IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

当输入变化时保持最新。应该有一些方法可以使用 Rx 来实现这一点,但我真的不知道怎么做。

最佳答案

有一种相当直接的方法可以用 Rx 做你想做的事,但你需要只考虑可观察量,而不是混合可枚举量。

您真正需要考虑的函数签名是:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

这是函数:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();

重要的是每个 IObservable<ChannelState>IObservable<IObservable<ChannelState>>行为正确才能使其正常工作。

我假设 ChannelState枚举有一个 Idle状态和每个 IObservable<ChannelState>将产生零对或多对 Operational/ Idle值( Operational 后跟 Idle ),然后再完成。

您还说“可以添加和删除 channel 的集合” - 从 IEnumerable<IObservable<ChannelState>> 的角度思考这听起来很合理——但在 Rx 中你不必担心删除,因为每个 observable 都可以发出自己的完成信号。一旦它发出完成信号,就好像它已从集合中删除,因为它不能产生任何进一步的值。所以你只需要担心添加到集合中 - 这很容易使用主题。

所以现在这个函数可以像这样使用:
var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);

channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });

channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc

我使用一些测试代码运行了这个,它使用了三个随机 ChannelState可观察值,带有 Do调用 f用于调试的函数,并得到以下序列:
1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down

我认为这就是你所追求的。如果我错过了什么,请告诉我。

根据下面的评论, ChannelState枚举有多个状态,但只有 Operational表示连接已建立。所以很容易添加一个 DistinctUntilChanged操作符隐藏多个“关闭”状态。这是现在的代码:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();

添加了代码以确保第一个选择查询始终以 1 开头.这是现在的代码:
Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
channelStates =>
channelStates
.Merge()
.Select(cs => cs == ChannelState.Operational ? 1 : -1)
.StartWith(1)
.DistinctUntilChanged()
.Scan(0, (cssn, csn) => cssn + csn)
.Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
.DistinctUntilChanged();

关于system.reactive - 如何将 n 个 observable 动态组合成一个列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6812251/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com