- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
更新:查看底部的示例
我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext
.可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的任务。这是一个实现。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
输出:
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
IObserver
发送消息,但所有订阅都接收消息。
但是,我该如何分离 IObservable
和 IObserver
? 它们以
Subject
的形式粘在一起.这是另一种方法。
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return Disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
这里的问题是这会创建两个单独的
Task
s 和两个独立的
IObserver
s。每个订阅都会创建一个新的 IObserver。您可以确认,因为
Assert
这里失败了。这对我来说真的没有任何意义。根据我对响应式(Reactive)编程的理解,我不希望
Subscribe
方法在这里创建一个新的
IObserver
每一次。退房
this gist .这是对
Observable.Create example 的轻微修改.它显示了 Subscribe 方法如何在每次调用时创建 IObserver。
如何在不使用 Subject
的情况下实现第一个示例中的功能?
Subject
如果您愿意,可以从出版商那里购买,但这不是必需的。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
最后,我应该补充一点,ReactiveUI 曾经有一个
MessageBus class .我不确定它是否已被删除,但不再推荐它。他们建议我们改用什么?
Observable.Create
做同样的事情 ?
Observable.Create
的问题是它为每个订阅运行操作。这不是预期的功能。无论有多少订阅,这里的长时间运行的任务都只运行一次。
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
}, cancellationToken);
public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(observer, (o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, default, cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, interval, cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
最佳答案
首先你必须熟悉"cold" and "hot" observables的理论。 .这是来自 Introduction to RX 的定义.
Observable.Create
方法创建冷观察。但是您可以使用
Publish
使任何可观察对象变得热。运算符(operator)。此运算符提供了一种方法,使多个独立观察者共享单个底层订阅。例子:
int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
_ = Task.Run(async () =>
{
while (true)
{
observer.OnNext(++index);
await Task.Delay(1000);
}
});
return Disposable.Empty;
});
IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop
hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
coldObservable
创建者
Observable.Create
订阅时
hotObservable.Connect
方法被调用,然后该单个订阅生成的所有通知将传播到
hotObservable
的所有订阅者。 .
Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...
重要提示:上面例子的目的是为了演示
Publish
运营商,而不是作为高质量 RX 代码的示例。它的一个问题是,通过在连接到源之后订阅观察者在理论上可能不会将第一个通知发送给部分或全部观察者,因为它可能在订阅之前创建。换句话说,存在竞争条件。
IConnectableObservable
生命周期的替代方法。 , 运营商
RefCount
:
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
var hotObservable = coldObservable.Publish().RefCount();
这样你就不需要
Connect
手动。连接在第一次订阅时自动发生,并在最后一次取消订阅时自动处理。
关于c# - 如何分离 IObservable 和 IObserver,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64508362/
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
我有一个鼠标左键状态流: var leftMouseButton = mouse.Select(x => x.LeftButton).DistinctUntilChanged(); 然后我Window
我有一个“值(value)观”IObservable这是返回 T必须按顺序组合成可变长度数组的元素,我有一个“控制”IObservable这告诉我下一个数组必须有多长。删除一个元素、重复它或将结果打乱
微软推出了 IObservable interface到 BCL 与 .NET Framework 4,我想,“太棒了,终于,我必须使用它!”因此,我深入挖掘并阅读帖子和文档,甚至实现了该模式。 这样
我有一个 IObservable 类型的流(热可观察)并想将其分成 IObservable 的两个可观察对象和 IObservable 我天真地尝试了以下但我只得到 flowStream人口稠密。 I
在 Windows Phone 7 上,IObservable 有一个新版本的 BufferWithTimeOrCount 扩展方法,它返回“流的流”而不是以前的“列表流”。我在尝试使用新方法或旧方法
更新:查看底部的示例 我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext .可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的
我想创建一个可用于表示动态计算值的类,而另一个表示值的类可以是这些动态计算值的源(主题)。目标是当主题发生变化时,计算值会自动更新。 在我看来,使用 IObservable/IObserver 是可行
我有一个返回接口(interface) IObservable 的方法(在 silverlight 中)并希望将其转换为另一个 IObservable ? 那么我需要用什么来代替“CONVERT_SO
这两种方法有什么区别,每种方法的最佳情况是什么?我知道他们都能够附加一个函数来处理来自 IObservable 的排放,但我并不真正理解除此之外的差异。 编辑 对不起,我应该指定的。 IObserva
我的系统有很多状态对象——连接状态、CPU 负载、登录用户等等。所有此类事件都合并到单个可观察流中。 我想制作一个管理实用程序来显示系统的实际状态并显示所有这些计数器。 我如何创建一个包含所有计数器的
我有一个 IObservable我把它变成一个IObservable使用一些中间步骤: var observedXDocuments = from b in observedBytes
Although it is possible to attach an observer to multiple providers, the recommended pattern is to a
在我当前正在开发的系统中,我有许多被定义为接口(interface)和基类的组件。系统的每个部分都有一些与系统其他部分交互的特定点。 例如,数据准备组件准备了一些数据,最终需要进入数据处理部分,通信组
我有一个 IObservable包含 XML 文档(的片段)。我想把一个变成另一个。因此,例如,假设我有以下从我的 IObservable 推送的片段(每行包含一个片段): 获取以下文件: 我一
我有一个 IObservable这给了我字节数组中不确定数量的字节。我想知道我是如何从那开始返回 IObservable 的每个字节数组中有一定数量的字节。假设我们一次需要 10 个字节。 也就是说,
我在我的一个项目中使用了 IObserver/IObservable 接口(interface)。 CommandReader 是一个 IObservable,它不断从流中读取数据,然后将其传递给它的
例如,考虑一下: public IDisposable Subscribe(IObserver observer) { return eventStream.Where
当我写 .Subscribe 时我经常发现 Resharper 为我选择了以下重载,位于 mscorlib 中,Version=4.0.0.0: namespace System { public
我是一名优秀的程序员,十分优秀!