gpt4 book ai didi

c# - 可从链式任务中观察到

转载 作者:太空狗 更新时间:2023-10-30 00:02:45 49 4
gpt4 key购买 nike

我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一个项目应该通过对前一个项目(共同递归)的结果的异步调用来产生。用“生成”的说法,这看起来像这样 - 除了 生成不支持异步(也不支持初始状态的委托(delegate)。

var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);

作为一个更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请按如下方式实现 ProduceFirst、Continue 和 ProduceNext:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

然后调用.SelectMany(i => i)IObservable<IEnumerable<BrokeredMessage>> 上把它变成一个IObservable<BrokeredMessage>

其中 _serviceBusReceiver 是接口(interface)的实例,如下所示:

public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

BrokeredMessage 来自https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

最佳答案

如果您打算推出自己的异步 Generate 函数,我建议您使用递归调度而不是包装一个 while 循环。

public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;

return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}

obs.OnNext(resultSelector(state));

self(await iterate(state));

});
});
}

这有两个优点。首先,您可以取消它,使用一个简单的 while 循环是无法直接取消它的,事实上,您甚至不会在 observable 完成之前返回订阅函数。其次,这使您可以控制每个项目的调度/异步(这使测试变得轻而易举),这也使它更适合图书馆

关于c# - 可从链式任务中观察到,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31485493/

49 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com