- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我一直在努力创建一个 IObservable<T>
使用 Reactive Extensions 实现对于 Twitter's streaming APIs .
从高层发送 HTTP 请求并且连接保持打开状态。带长度前缀的项目将被发送以供消耗。
基本上,这是循环调用 Stream.ReadAsync
使用 await
keyword 。安 IObserver<T>
interface实现(来自 Observable.Create
或来自 Dataflow library 的 block ,没关系,它是实现细节)传递到此循环,然后调用 IObserver<T>
上的方法实现,产生可观察的。
但是,在此循环开始处理之前必须完成许多事情,这些事情需要调用 Task<T>
-返回方法,所有这些方法在 C# 5.0 中都可以使用 await
更轻松地调用关键词。像这样的事情:
public async Task<IObservable<string>> Create(string parameter,
CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).
ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
// Create the observable.
return block.AsObservable();
}
也就是说,我目前正在返回 Task<IObservable<T>>
从我的方法来看,但我觉得我在 react 性扩展中遗漏了一些东西,它允许我使用 await
为了方便我需要调用的电话,还返回 IObservable<T>
而不是Task<IObservable<T>>
.
响应式扩展中的什么方法允许我创建一个在从创建方法返回之前需要等待方法的可观察对象?
我发现的最接近的是Observable.DeferAsync
。假设对我的方法的调用和可观察对象的使用类似于:
public async Task Observe()
{
// NOT the real name of the interface, but explains it's role here.
IObservableFactory factory;
// Create is really named something else.
IObservable<string> observable = factory.Create("parameter");
// Subscribe.
observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));
// Wait.
await observable;
}
使用DeferAsync
在这里不起作用,因为调用 Subscribe
将发送第一个请求,然后读取该请求,然后调用 await
上observable
将创建第二个订阅,但是订阅不同的可观察值。
或者,最终,返回Task<IObservable<T>>
在响应式框架中执行此操作的适当方法?
随后,由于该方法返回 Task<T>
,通过 CancellationToken
是一个很好的做法。取消操作。也就是说,我可以理解 CancellationToken
用于取消可观察对象的创建,但它也应该用于取消实际可观察对象(因为它可以向下传递以读取流等) .
我的直觉告诉我不行,因为取消违反了关注点分离以及 DRY 原则:
Subscribe
将返回 IDisposable
实现这将取消订阅。最佳答案
我不会返回 Task<IObservable<T>>
。在公共(public) API 中混合任务和可观察对象最终会变得困惑。请记住,任务可以被视为产生单个值的可观察量。这也意味着不要将 CancellationToken 与公共(public) API 中的可观察量混合在一起。您可以通过订阅和取消订阅来控制可观察量。
这并不意味着您不能混合幕后的概念。以下是如何使用 Observable.Using
执行您想要的操作, Task.ToObservable
和CancellationDisposable
首先,修改您的方法以返回 Task<ISourceBlock<string>>
:
public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
return block;
}
现在这是使用上述方法的新 Create 方法:
public IObservable<string> Create(string parameter)
{
// Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
// Use ToObservable() to convert the Task to an observable so we can then
// use SelectMany to subscribe to the block itself once it is available
return Observable.Using(() => new CancellationDisposable(),
cd => CreateBlock(parameter, cd.Token)
.ToObservable()
.SelectMany(block => block.AsObservable()));
}
编辑:我发现 Rx 已经用 FromAsync
实现了这个模式。 :
public IObservable<string> Create(string parameter)
{
return Observable.FromAsync(token => CreateBlock(parameter, token))
.SelectMany(block => block.AsObservable());
}
还有,DeferAsync
,这更合适,因为您的 Task
实际上是在创建你真正想要观察的 Observable(例如你的 block ):
public IObservable<string> Create(string parameter)
{
return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
关于.net - Rx 框架中的什么允许我在创建期间等待其他方法时返回 IObservable<T> ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15730531/
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
我有一个鼠标左键状态流: var leftMouseButton = mouse.Select(x => x.LeftButton).DistinctUntilChanged(); 然后我Window
我有一个“值(value)观”IObservable这是返回 T必须按顺序组合成可变长度数组的元素,我有一个“控制”IObservable这告诉我下一个数组必须有多长。删除一个元素、重复它或将结果打乱
微软推出了 IObservable interface到 BCL 与 .NET Framework 4,我想,“太棒了,终于,我必须使用它!”因此,我深入挖掘并阅读帖子和文档,甚至实现了该模式。 这样
我有一个 IObservable 类型的流(热可观察)并想将其分成 IObservable 的两个可观察对象和 IObservable 我天真地尝试了以下但我只得到 flowStream人口稠密。 I
在 Windows Phone 7 上,IObservable 有一个新版本的 BufferWithTimeOrCount 扩展方法,它返回“流的流”而不是以前的“列表流”。我在尝试使用新方法或旧方法
更新:查看底部的示例 我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext .可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的
我想创建一个可用于表示动态计算值的类,而另一个表示值的类可以是这些动态计算值的源(主题)。目标是当主题发生变化时,计算值会自动更新。 在我看来,使用 IObservable/IObserver 是可行
我有一个返回接口(interface) IObservable 的方法(在 silverlight 中)并希望将其转换为另一个 IObservable ? 那么我需要用什么来代替“CONVERT_SO
这两种方法有什么区别,每种方法的最佳情况是什么?我知道他们都能够附加一个函数来处理来自 IObservable 的排放,但我并不真正理解除此之外的差异。 编辑 对不起,我应该指定的。 IObserva
我的系统有很多状态对象——连接状态、CPU 负载、登录用户等等。所有此类事件都合并到单个可观察流中。 我想制作一个管理实用程序来显示系统的实际状态并显示所有这些计数器。 我如何创建一个包含所有计数器的
我有一个 IObservable我把它变成一个IObservable使用一些中间步骤: var observedXDocuments = from b in observedBytes
Although it is possible to attach an observer to multiple providers, the recommended pattern is to a
在我当前正在开发的系统中,我有许多被定义为接口(interface)和基类的组件。系统的每个部分都有一些与系统其他部分交互的特定点。 例如,数据准备组件准备了一些数据,最终需要进入数据处理部分,通信组
我有一个 IObservable包含 XML 文档(的片段)。我想把一个变成另一个。因此,例如,假设我有以下从我的 IObservable 推送的片段(每行包含一个片段): 获取以下文件: 我一
我有一个 IObservable这给了我字节数组中不确定数量的字节。我想知道我是如何从那开始返回 IObservable 的每个字节数组中有一定数量的字节。假设我们一次需要 10 个字节。 也就是说,
我在我的一个项目中使用了 IObserver/IObservable 接口(interface)。 CommandReader 是一个 IObservable,它不断从流中读取数据,然后将其传递给它的
例如,考虑一下: public IDisposable Subscribe(IObserver observer) { return eventStream.Where
当我写 .Subscribe 时我经常发现 Resharper 为我选择了以下重载,位于 mscorlib 中,Version=4.0.0.0: namespace System { public
我是一名优秀的程序员,十分优秀!