- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示:
public InvokeOperation<T> SomeFunc(
SomeData data,
Action<InvokeOperation<T>> callback,
object userState)
我如何将其与 Reactive Extensions 一起使用,以便我可以订阅回调并获得 InvokeOperation 结果?
更新:这是我目前对 Enigmativity 混合解决方案的实现。我需要实际的 InvokeOperation 而不仅仅是值,因为 InvokeOperation UserState 可能很有值(value)。需要注意的是,我根本没有测试过错误处理。
public static class ObservableEx
{
public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
Tdat data,
System.Reactive.Concurrency.IScheduler scheduler )
{
return
Observable.Defer<InvokeOperation<T>>( () =>
FromCallbackPattern<Tdat, T>( func, scheduler )
.Invoke( data ) );
}
private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler )
{
return p =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call( p, iot =>
{
if ( iot.HasError )
{
subject.OnError( iot.Error );
}
else
{
subject.OnNext( iot );
subject.OnCompleted();
}
}, p );
}
catch ( Exception ex )
{
subject.OnError( ex );
}
return subject.ObserveOn( scheduler );
};
}
}
使用给定函数
public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)
var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );
这对匹配给定函数签名的任何函数都非常有效。不幸的是,现在我遇到了一些变体,例如
Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>
任何人有任何建议可以将其转换为能够处理我想抛给它的任何 InvokeOperation 方法吗?
最佳答案
EDIT1:请参阅下文,了解基于 Paul Betts 的回答和我的混合解决方案。
EDIT2:请参阅下文,了解基于 OP 更新的“第三代”解决方案。
回调有点难以处理,我必须说将它变成一个可观察的是一个很好的方法。
我有一个适合我的方法。
基本的方法是把SomeFunc
运算成Func<T>
然后调用Observable.Start
在那上面。我把它包装在 Observable.Create
中为了保持干净,我添加了错误处理。我已经完成了基本测试,但没有太可靠的测试。
使用代码如下所示:
var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });
我假设您的 RIA 服务类是 RiaService<T>
并 build 了SomeObservableFunc
像这样的扩展方法:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var error = (Exception)null;
Func<T> call = () =>
{
var result = default(T);
var mre = new ManualResetEvent(false);
Action<InvokeOperation<T>> callback = iot =>
{
try
{
if (iot.HasError)
{
error = iot.Error;
}
else
{
result = iot.Value;
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
mre.Set();
}
};
try
{
service.SomeFunc(data, callback, null);
mre.WaitOne();
}
catch (Exception ex)
{
error = ex;
}
return result;
};
return Observable
.Start(call, scheduler)
.Subscribe(t =>
{
try
{
if (error == null)
{
o.OnNext(t);
}
else
{
o.OnError(error);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
}, ex => o.OnError(ex), () =>
{
if (error == null)
{
o.OnCompleted();
}
});
});
}
如果这对你有用,请大声说出来。
我喜欢 Paul Betts 的解决方案,因为它没有使用 ManualResetEvent
,但它没有编译,也没有处理在 RIA 服务调用期间可能发生的内部错误,因此我创建了以下混合解决方案。
我的扩展方法现在看起来像这样:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return
Observable.Defer<T>(() =>
FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
.Invoke(data));
}
这使用了重做的 FromCallbackPattern
最初由 Paul Betts 创建:
private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return p =>
{
var subject = new AsyncSubject<T>();
try
{
call(p, iot =>
{
if (iot.HasError)
{
subject.OnError(iot.Error);
}
else
{
subject.OnNext(iot.Value);
subject.OnCompleted();
}
}, null);
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
};
}
它适用于我的测试代码,我认为这是一个更好的整体解决方案。
此版本的解决方案旨在允许将不同数量的参数和用户状态传递给 FromCallbackPattern
扩展方法。
我从这个通用目的开始FromCallbackPattern
扩展方法:
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this Action<Action<InvokeOperation<T>>> call,
IScheduler scheduler)
{
return Observable.Defer(() =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call(iot =>
{
subject.OnNext(iot);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
});
}
然后我需要一系列私有(private)Reduce
将各种服务调用减少到 Action<Action<InvokeOperation<T>>>
的扩展方法代表们:
private static Action<Action<InvokeOperation<T>>> Reduce<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
object state)
{
return a => call(a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P p, object state)
{
return a => call(p, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, object state)
{
return a => call(p1, p2, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, object state)
{
return a => call(p1, p2, p3, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state)
{
return a => call(p1, p2, p3, p4, a, state);
}
现在我可以写普通的FromCallbackPattern
扩展方法:
public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return o => call.Reduce(o).FromCallbackPattern(scheduler);
}
public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
}
然后,最后,原来的 SomeObservableFunc
/ObservableInvokeOperation
扩展方法(现在也重命名为 FromCallbackPattern
):
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this RiaService<T> service,
Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P p, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, p4, state));
}
显然您需要替换对 RiaService<T>
的引用使用您的 RIA 服务类类型。
这些方法可以这样调用:
IObservable<InvokeOperation<int>> obs1 =
service
.FromCallbackPattern(
s => s.SomeFunc,
new SomeData(),
null, // user state
Scheduler.ThreadPool);
IObservable<InvokeOperation<int>> obs2 =
service
.FromCallbackPattern(
s => s.SomeOtherFunc,
42, "Hello", 3.14159265,
null, // user state
Scheduler.ThreadPool);
呸!现在怎么样了?
关于system.reactive - 如何将带有 InvokeOperation 回调的此函数转换为 Reactive Extensions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7802135/
类型‘AbstractControl’上不存在属性‘Controls’。
主要是我很好奇。 我们有一个名为 Unit 的对象在我们的代码库中 - 代表桥梁或道路的组件。在我们的例子中,看到带有 Unit 的 ReactiveUI 命令可能会模棱两可。作为声明中的泛型之一。
我一直听说六边形架构必须与任何框架无关,并使用接口(interface) (SPI) 来委托(delegate)不属于业务层的每个代码部分。 但是如何在不使用额外框架的情况下通过六边形架构创建一个响应
我读了 Reactive Manifesto . 但我无法理解 event driven architectures 之间的核心差异和 message driven architectures .结果
申请要求: 订阅两个事件流 A 和 B 对于每个 A 事件,一段时间后应该有相应的 B 事件 如果没有相应的 B 到达(及时),应用程序会监视 A 事件并发出警报 B 事件可以以与 A 事件不同的顺序
Closed. This question is opinion-based。它当前不接受答案。 想改善这个问题吗?更新问题,以便editing this post用事实和引用来回答。 4年前关闭。
我有一个 ViewModel,它在其初始化程序中有一个输入 init(sliderEvents: Reactive) { 在测试中我想做类似的事情 slider.send(.touchDownInsi
经典实时搜索示例: var searchResults = from input in textBoxChanged from results in GetDa
我有一个响应式(Reactive)管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing )。 完成后,我需要将发生的事情通知一些外部
是否可以为响应式扩展实现基于硬件计时器的自定义调度程序?我该如何开始,有什么好的例子吗? 我有一个硬件可以每毫秒向我发送一个准确的中断。我想利用它来创建更精确的 RX 调度程序。 更新 感谢 Asti
我正在通过网络浏览 Rx 框架 Material ,我发现了很多。 现在,每当我为此在 google 上搜索时,我还会在 wikipedia 链接中找到“响应式(Reactive)编程”。 由于响应式
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 6年前关闭。 Improve this
SignalR 与响应式扩展是同一回事吗?你能解释一下为什么或为什么不吗? 最佳答案 不,它们绝对不是同一件事。 Reactive Extensions 是一个用于创建和组合可观察的数据流或事件流的库
我知道有一种简单的方法可以做到这一点 - 但今晚它打败了我...... 我想知道两个事件是否在 300 毫秒内发生,就像双击一样。 在 300 毫秒内单击两次左键鼠标 - 我知道这是构建响应式(Rea
我们的应用程序使用 Reactive Extensions (Rx)。这些通常通过 Microsoft 的可下载包安装。但是,当我们发布应用程序时,我们会提供 dll 的副本(即 System.Cor
我想了解 Reactive 和 ReactiveStreams 之间的区别,特别是在 RxJava 的上下文中? 我能想到的最多的是 Reactive Streams 在规范中有一些背压的概念,但它已
我想探索来自 Quarkus 的响应式 REST 客户端的慢速后端,并在他们建议的样本 (https://github.com/quarkusio/quarkus-quickstarts/tree/m
假设我有一个存储桶,我需要从中获取日期早于现在的文档。 该文档如下所示: { id: "1", date: "Some date", otherObjectKEY: "key1" } 对于每个文档,我
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示: public InvokeOperation SomeFunc( SomeData data, Action> callb
我一直在使用 Rx 在单个应用程序中创建事件总线(想想 CQRS/ES),它似乎工作得很好。然而,在调查了一堆不同的事件溯源框架之后,我还没有看到使用过一次 Rx。与基于反射/容器的调度程序相比,它似
我是一名优秀的程序员,十分优秀!