gpt4 book ai didi

system.reactive - 如何将带有 InvokeOperation 回调的此函数转换为 Reactive Extensions?

转载 作者:行者123 更新时间:2023-12-04 07:06:47 25 4
gpt4 key购买 nike

我有一个 RIA 服务数据服务,它有几个函数调用,如下所示:

public InvokeOperation<T> SomeFunc(
SomeData data,
Action<InvokeOperation<T>> callback,
object userState)

我如何将其与 Reactive Extensions 一起使用,以便我可以订阅回调并获得 InvokeOperation 结果?


更新:这是我目前对 Enigmativity 混合解决方案的实现。我需要实际的 InvokeOperation 而不仅仅是值,因为 InvokeOperation UserState 可能很有值(value)。需要注意的是,我根本没有测试过错误处理。

public static class ObservableEx
{
public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
Tdat data,
System.Reactive.Concurrency.IScheduler scheduler )
{
return
Observable.Defer<InvokeOperation<T>>( () =>
FromCallbackPattern<Tdat, T>( func, scheduler )
.Invoke( data ) );
}

private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler )
{
return p =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call( p, iot =>
{
if ( iot.HasError )
{
subject.OnError( iot.Error );
}
else
{
subject.OnNext( iot );
subject.OnCompleted();
}
}, p );
}
catch ( Exception ex )
{
subject.OnError( ex );
}
return subject.ObserveOn( scheduler );
};
}
}

使用给定函数

public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)

var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );

这对匹配给定函数签名的任何函数都非常有效。不幸的是,现在我遇到了一些变体,例如

Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>

任何人有任何建议可以将其转换为能够处理我想抛给它的任何 InvokeOperation 方法吗?

最佳答案

EDIT1:请参阅下文,了解基于 Paul Betts 的回答和我的混合解决方案。

EDIT2:请参阅下文,了解基于 OP 更新的“第三代”解决方案。


回调有点难以处理,我必须说将它变成一个可观察的是一个很好的方法。

我有一个适合我的方法。

基本的方法是把SomeFunc运算成Func<T>然后调用Observable.Start在那上面。我把它包装在 Observable.Create 中为了保持干净,我添加了错误处理。我已经完成了基本测试,但没有太可靠的测试。

使用代码如下所示:

var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });

我假设您的 RIA 服务类是 RiaService<T>并 build 了SomeObservableFunc像这样的扩展方法:

    public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var error = (Exception)null;
Func<T> call = () =>
{
var result = default(T);
var mre = new ManualResetEvent(false);
Action<InvokeOperation<T>> callback = iot =>
{
try
{
if (iot.HasError)
{
error = iot.Error;
}
else
{
result = iot.Value;
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
mre.Set();
}
};
try
{
service.SomeFunc(data, callback, null);
mre.WaitOne();
}
catch (Exception ex)
{
error = ex;
}
return result;
};

return Observable
.Start(call, scheduler)
.Subscribe(t =>
{
try
{
if (error == null)
{
o.OnNext(t);
}
else
{
o.OnError(error);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
}, ex => o.OnError(ex), () =>
{
if (error == null)
{
o.OnCompleted();
}
});
});
}

如果这对你有用,请大声说出来。


编辑1

我喜欢 Paul Betts 的解决方案,因为它没有使用 ManualResetEvent ,但它没有编译,也没有处理在 RIA 服务调用期间可能发生的内部错误,因此我创建了以下混合解决方案。

我的扩展方法现在看起来像这样:

    public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return
Observable.Defer<T>(() =>
FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
.Invoke(data));
}

这使用了重做的 FromCallbackPattern最初由 Paul Betts 创建:

    private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return p =>
{
var subject = new AsyncSubject<T>();
try
{
call(p, iot =>
{
if (iot.HasError)
{
subject.OnError(iot.Error);
}
else
{
subject.OnNext(iot.Value);
subject.OnCompleted();
}
}, null);
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
};
}

它适用于我的测试代码,我认为这是一个更好的整体解决方案。


编辑2

此版本的解决方案旨在允许将不同数量的参数和用户状态传递给 FromCallbackPattern扩展方法。

我从这个通用目的开始FromCallbackPattern扩展方法:

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this Action<Action<InvokeOperation<T>>> call,
IScheduler scheduler)
{
return Observable.Defer(() =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call(iot =>
{
subject.OnNext(iot);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
});
}

然后我需要一系列私有(private)Reduce将各种服务调用减少到 Action<Action<InvokeOperation<T>>> 的扩展方法代表们:

    private static Action<Action<InvokeOperation<T>>> Reduce<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
object state)
{
return a => call(a, state);
}

private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P p, object state)
{
return a => call(p, a, state);
}

private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, object state)
{
return a => call(p1, p2, a, state);
}

private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, object state)
{
return a => call(p1, p2, p3, a, state);
}

private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state)
{
return a => call(p1, p2, p3, p4, a, state);
}

现在我可以写普通的FromCallbackPattern扩展方法:

    public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return o => call.Reduce(o).FromCallbackPattern(scheduler);
}

public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
}

public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
}

public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
}

public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
}

然后,最后,原来的 SomeObservableFunc/ObservableInvokeOperation扩展方法(现在也重命名为 FromCallbackPattern ):

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this RiaService<T> service,
Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(state));
}

public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P p, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p, state));
}

public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, state));
}

public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, state));
}

public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, p4, state));
}

显然您需要替换对 RiaService<T> 的引用使用您的 RIA 服务类类型。

这些方法可以这样调用:

IObservable<InvokeOperation<int>> obs1 =
service
.FromCallbackPattern(
s => s.SomeFunc,
new SomeData(),
null, // user state
Scheduler.ThreadPool);

IObservable<InvokeOperation<int>> obs2 =
service
.FromCallbackPattern(
s => s.SomeOtherFunc,
42, "Hello", 3.14159265,
null, // user state
Scheduler.ThreadPool);

呸!现在怎么样了?

关于system.reactive - 如何将带有 InvokeOperation 回调的此函数转换为 Reactive Extensions?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7802135/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com