- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我正在尝试对(对我来说)不重要的 Rx 查询建模:
每个人都有以下属性:
class Man
{
public const int LookingAtNobody = 0;
public int Id { get; set; }
public double Location { get; set; }
public int LookingAt { get; set; }
}
每个女人都有以下属性:
class Woman
{
public int Id { get; set; }
public double Location { get; set; }
}
为了代表男人,我们有 IObservable<IObservable<Man>>
,并代表我们拥有的女性IObservable<IObservable<Woman>>
.
您将如何使用 Rx 生成从男性到他们正在看的女性的向量:IObservable<IObservable<Tuple<double,double>>>
?
为了提供帮助,这里有一些针对一些简单情况的单元测试:
public class Tests : ReactiveTest
{
[Test]
public void Puzzle1()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnCompleted<Man>(300));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle2()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnCompleted<Man>(400));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(350),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle3()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
OnCompleted<Man>(400));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
}
[Test]
public void Puzzle4()
{
var scheduler = new TestScheduler();
var m1 = scheduler.CreateHotObservable(
OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
OnCompleted<Man>(500));
var w1 = scheduler.CreateHotObservable(
OnNext(150, new Woman { Id = 10, Location = 10.0 }),
OnNext(250, new Woman { Id = 10, Location = 20.0 }),
OnCompleted<Woman>(350));
var w2 = scheduler.CreateHotObservable(
OnNext(155, new Woman { Id = 20, Location = 100.0 }),
OnNext(255, new Woman { Id = 20, Location = 200.0 }),
OnNext(355, new Woman { Id = 20, Location = 300.0 }),
OnCompleted<Woman>(455));
var men = scheduler.CreateHotObservable(OnNext(50, m1));
var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));
var results = runQuery(scheduler, women, men);
var innerResults = (from msg in results
where msg.Value.HasValue
select msg.Value.Value).ToArray();
var expectedVector1 = new[]
{
OnNext(200, Tuple.Create(2.0, 10.0)),
OnNext(250, Tuple.Create(2.0, 20.0)),
OnCompleted<Tuple<double,double>>(300),
};
var expectedVector2 = new[]
{
OnNext(300, Tuple.Create(3.0, 200.0)),
OnNext(355, Tuple.Create(3.0, 300.0)),
OnNext(400, Tuple.Create(4.0, 300.0)),
OnCompleted<Tuple<double,double>>(455),
};
ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
}
private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
{
// assuming nested sequences are hot
var vectors =
from manDuration in men
join womanDuration in women on manDuration equals womanDuration
select from man in manDuration
join woman in womanDuration on manDuration equals womanDuration
where man.LookingAt == woman.Id
select Tuple.Create(man.Location, woman.Location);
var query = vectors.Select(vectorDuration =>
{
var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
vectorDuration.Subscribe(vectorResults);
return vectorResults.Messages;
});
var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
return results;
}
}
(注意:这个问题交叉发布到 Rx 论坛:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)
最佳答案
如果我对您的理解正确,目标是创建一个“跟随可观察对象”的可观察对象,其中“跟随可观察对象”从男人开始看女人时开始,到男人停止看女人时结束。 “follow observable”应该包含男人和女人最近位置的元组。
这里的想法是使用CombineLatest
,它将接受两个可观察值,当它们中的任何一个产生一个值时,组合器将针对两个可观察值的最新值进行评估,从而产生一个组合可观察值中的值。但是,CombineLatest
仅在两个可观察对象都已完成时才完成。在这种情况下,我们希望在两个来源中的任何一个完成时完成可观察对象。为此,我们定义了以下扩展方法(我不认为这样的方法已经存在,但可能有更简单的解决方案):
public static IObservable<TSource>
UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,
IObservable<TWhile> lifetime)
{
return Observable.Create<TSource>(observer =>
{
var subscription = source.Subscribe(observer);
var limiter = lifetime.Subscribe(next => { }, () =>
{
subscription.Dispose();
observer.OnCompleted();
});
return new CompositeDisposable(subscription, limiter);
});
}
此方法类似于TakeUntil
,但它不是一直持续到lifetime
产生一个值,而是一直持续到lifetime
完成。我们还可以定义一个简单的扩展方法,它采用满足谓词的第一个连胜:
public static IObservable<TSource>
Streak<TSource>(this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);
}
现在对于最终查询,我们使用 CombineLatest
将所有男性和所有女性组合在一起,并使用 UntilCompleted
尽早完成该可观察对象。为了获得“follow observables”,我们选择男人注视女人的那一连串。然后我们简单地将其映射到一个位置元组。
var vectors =
from manDuration in men
from womanDuration in women
select manDuration
.CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
.UntilCompleted(womanDuration)
.UntilCompleted(manDuration)
.Streak(pair => pair.Man.LookingAt == pair.Woman.Id)
.Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));
这通过了您的所有测试,但它无法处理男人看女人 10 一会儿,然后看 20 一会儿,然后再看 10 一会儿的场景;仅使用第一个 连胜。要观察所有条纹,我们可以使用以下扩展方法,它返回条纹的可观察值:
public static IObservable<IObservable<TSource>>
Streaks<TSource>(this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return Observable.Create<IObservable<TSource>>(observer =>
{
ReplaySubject<TSource> subject = null;
bool previous = false;
return source.Subscribe(x =>
{
bool current = predicate(x);
if (!previous && current)
{
subject = new ReplaySubject<TSource>();
observer.OnNext(subject);
}
if (previous && !current) subject.OnCompleted();
if (current) subject.OnNext(x);
previous = current;
}, () =>
{
if (subject != null) subject.OnCompleted();
observer.OnCompleted();
});
});
}
通过只订阅一次源流,并使用 ReplaySubject
,此方法适用于热和冷可观察对象。现在对于最终查询,我们按如下方式选择所有条纹:
var vectors =
from manDuration in men
from womanDuration in women
from streak in manDuration
.CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
.UntilCompleted(womanDuration)
.UntilCompleted(manDuration)
.Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)
select streak.Select(pair =>
Tuple.Create(pair.Man.Location, pair.Woman.Location));
关于c# - 加入 Rx 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9758083/
我用 Cocoapods 创建了一个简单的项目并安装了 RxSwift 和 RxCocoa。 我写了这段代码: import UIKit import RxSwift class ViewContro
我对 Rx 真的很陌生,只是想知道 subscribeOn 的顺序如何影响 Observable //This will not print anything Observable.just("wha
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。 例子 observable A 每秒发出一次“滴答声” observable B 应该每 5 秒
我有以下流。 Observable.just(Unit) // execute immediately .mergeWith(tryAgainRelay) // execute again w
我可以看到 RX 适用于 Android 和 UI 事件处理。我很难看到 RX 在后端提供了什么好处。 RX Java 是为后端处理而设计的,还是这个概念太过分了? 最佳答案 实际上,RxJava 最
我有一个关于什么是更好的 RxJava 模式以保持可观察状态的问题。 为简单起见,假设我们有一个 StateManager需要跟踪系统中某个状态(假设它是一个简单的 bool 标志)并以可观察的方式公
我遇到了以下问题。我有一个从具有 isMember 属性的服务器请求的基本对象。如果该属性为 true,我想向端点 1、2 和 3 发出请求。 如果该属性为 false,我只想向端点 1 发出请求。
我正在使用 RxJava,但也许能够翻译另一个实现的答案。 我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示: observable .buffer(10)
有没有办法计算 RxAndroid 中流中已处理的元素数量? 我有类似的事情: Observable.fromArray(new String[]{"these", "are", "my", "val
如果我的客户端断开连接(错误)超过 10 秒,我会尝试显示一个弹出窗口。但是,当重新连接时,我也会关闭弹出窗口(true)。如果为真,我需要立即关闭弹出窗口。 我认为我需要做的是根据值(假)进行去抖,
收到 onNext() 后如何自动退订? 现在我使用这个代码: rxObservable .compose(bindToLifecycle()) // unsubscribe automaticall
我在 Ettus x310 上有一个简单的 C++ 测试程序,以前可以用,但现在不行了。我试图简单地设置单个 USRP 的两个 channel 的两个中心频率。当我尝试在第二个 channel 上设置
我有一个返回 Observable 的网络调用,我有另一个网络调用,它不是依赖于第一个 Observable 的 rx,我需要以某种方式转换这一切都与 Rx 相关。 Observable respon
我正在尝试根据 Rxjava 中的某些条件创建组列表。 以下是我的回复: { "dates":[ { "date":18, "value":
RX 6800 XT是最近很强的一款显卡,让很多的用户都很喜爱,详细还有很多没有入手的用户对他不是很了解吧,下面就带来了RX 6800 XT评测、跑分、价格、参数、图片,快来一起看看吧。 RX
我有一个重复元素列表,比如: Observable source = Observable.just("A", "B", "A", "C", "C", "A"); 我想按它们的值以及它们出现的次数对它
如果我将 RX-main 包添加到 WPF 应用程序,我会安装以下依赖包: 接收接口(interface) 接收核心 Rx-Linq Rx 平台服务 主要接收 添加 Rx-main 后,可以安装两个与
com.myapp.test.debug E/MessageQueue-JNI:rx.exceptions.OnErrorNotImplementedException 在 rx.Observable
这个问题在这里已经有了答案: RxJava operator that emits if different from last (1 个回答) 关闭 5 年前。 我有一个 bool 型 Obser
首先,我在两个 shell 中同时运行 vnstat -tr -i eth0 和 iftop。奇怪的是,vnstat 中的 rx 比 iftop 中的峰值 rx 大得多。我该如何解决这个问题? vns
我是一名优秀的程序员,十分优秀!