- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示:
public InvokeOperation<T> SomeFunc(
SomeData data,
Action<InvokeOperation<T>> callback,
object userState)
我如何将其与 Reactive Extensions 一起使用,以便我可以订阅回调并获得 InvokeOperation 结果?
更新:这是我目前对 Enigmativity 混合解决方案的实现。我需要实际的 InvokeOperation 而不仅仅是值,因为 InvokeOperation UserState 可能很有值(value)。需要注意的是,我根本没有测试过错误处理。
public static class ObservableEx
{
public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
Tdat data,
System.Reactive.Concurrency.IScheduler scheduler )
{
return
Observable.Defer<InvokeOperation<T>>( () =>
FromCallbackPattern<Tdat, T>( func, scheduler )
.Invoke( data ) );
}
private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler )
{
return p =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call( p, iot =>
{
if ( iot.HasError )
{
subject.OnError( iot.Error );
}
else
{
subject.OnNext( iot );
subject.OnCompleted();
}
}, p );
}
catch ( Exception ex )
{
subject.OnError( ex );
}
return subject.ObserveOn( scheduler );
};
}
}
使用给定函数
public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)
var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );
这对匹配给定函数签名的任何函数都非常有效。不幸的是,现在我遇到了一些变体,例如
Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>
任何人有任何建议可以将其转换为能够处理我想抛给它的任何 InvokeOperation 方法吗?
最佳答案
EDIT1:请参阅下文,了解基于 Paul Betts 的回答和我的混合解决方案。
EDIT2:请参阅下文,了解基于 OP 更新的“第三代”解决方案。
回调有点难以处理,我必须说将它变成一个可观察的是一个很好的方法。
我有一个适合我的方法。
基本的方法是把SomeFunc
运算成Func<T>
然后调用Observable.Start
在那上面。我把它包装在 Observable.Create
中为了保持干净,我添加了错误处理。我已经完成了基本测试,但没有太可靠的测试。
使用代码如下所示:
var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });
我假设您的 RIA 服务类是 RiaService<T>
并 build 了SomeObservableFunc
像这样的扩展方法:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var error = (Exception)null;
Func<T> call = () =>
{
var result = default(T);
var mre = new ManualResetEvent(false);
Action<InvokeOperation<T>> callback = iot =>
{
try
{
if (iot.HasError)
{
error = iot.Error;
}
else
{
result = iot.Value;
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
mre.Set();
}
};
try
{
service.SomeFunc(data, callback, null);
mre.WaitOne();
}
catch (Exception ex)
{
error = ex;
}
return result;
};
return Observable
.Start(call, scheduler)
.Subscribe(t =>
{
try
{
if (error == null)
{
o.OnNext(t);
}
else
{
o.OnError(error);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
}, ex => o.OnError(ex), () =>
{
if (error == null)
{
o.OnCompleted();
}
});
});
}
如果这对你有用,请大声说出来。
我喜欢 Paul Betts 的解决方案,因为它没有使用 ManualResetEvent
,但它没有编译,也没有处理在 RIA 服务调用期间可能发生的内部错误,因此我创建了以下混合解决方案。
我的扩展方法现在看起来像这样:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return
Observable.Defer<T>(() =>
FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
.Invoke(data));
}
这使用了重做的 FromCallbackPattern
最初由 Paul Betts 创建:
private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return p =>
{
var subject = new AsyncSubject<T>();
try
{
call(p, iot =>
{
if (iot.HasError)
{
subject.OnError(iot.Error);
}
else
{
subject.OnNext(iot.Value);
subject.OnCompleted();
}
}, null);
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
};
}
它适用于我的测试代码,我认为这是一个更好的整体解决方案。
此版本的解决方案旨在允许将不同数量的参数和用户状态传递给 FromCallbackPattern
扩展方法。
我从这个通用目的开始FromCallbackPattern
扩展方法:
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this Action<Action<InvokeOperation<T>>> call,
IScheduler scheduler)
{
return Observable.Defer(() =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call(iot =>
{
subject.OnNext(iot);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
});
}
然后我需要一系列私有(private)Reduce
将各种服务调用减少到 Action<Action<InvokeOperation<T>>>
的扩展方法代表们:
private static Action<Action<InvokeOperation<T>>> Reduce<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
object state)
{
return a => call(a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P p, object state)
{
return a => call(p, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, object state)
{
return a => call(p1, p2, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, object state)
{
return a => call(p1, p2, p3, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state)
{
return a => call(p1, p2, p3, p4, a, state);
}
现在我可以写普通的FromCallbackPattern
扩展方法:
public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return o => call.Reduce(o).FromCallbackPattern(scheduler);
}
public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
}
然后,最后,原来的 SomeObservableFunc
/ObservableInvokeOperation
扩展方法(现在也重命名为 FromCallbackPattern
):
public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this RiaService<T> service,
Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P p, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, p4, state));
}
显然您需要替换对 RiaService<T>
的引用使用您的 RIA 服务类类型。
这些方法可以这样调用:
IObservable<InvokeOperation<int>> obs1 =
service
.FromCallbackPattern(
s => s.SomeFunc,
new SomeData(),
null, // user state
Scheduler.ThreadPool);
IObservable<InvokeOperation<int>> obs2 =
service
.FromCallbackPattern(
s => s.SomeOtherFunc,
42, "Hello", 3.14159265,
null, // user state
Scheduler.ThreadPool);
呸!现在怎么样了?
关于system.reactive - 如何将带有 InvokeOperation 回调的此函数转换为 Reactive Extensions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7802135/
我有这段代码,但不知道如何通过 Lambda 表达式简化它? public void LoadEntities(QueryBuilder query, Action> callback, object
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示: public InvokeOperation SomeFunc( SomeData data, Action> callb
我是一名优秀的程序员,十分优秀!