gpt4 book ai didi

c# - 处理项目时缓冲

转载 作者:太空狗 更新时间:2023-10-30 00:51:18 25 4
gpt4 key购买 nike

我有一个定期触发的事件。让我们假设处理事件需要大约 1 秒。我不想为每个接收到的事件等待 1 秒,而是想累积事件 直到最后的处理完成。处理完成后,我想处理上次处理期间收到的事件数据:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
1s 2s 3s 4s 5s 6s
p(e1) p(e2, e3) p(e4) p(e5, e6) p(e7)
[-----------------------][-----------------------] [-----------------------][-----------------------][-----------------------] processing of items

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 -
is finished the processing of the events e2 and e3 takes place.

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been
checked in during the build will then be processed.

我应该如何使用 Rx 做到这一点?

我试过将 Buffer 与打开和关闭选择器结合使用,但我无法正确使用。感谢任何示例或指导!

让我们假设一个 Subject<int>作为输入流。

我试过类似的东西,但我完全迷路了。

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
bc.OnNext(true);
String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
Thread.Sleep(300);
bc.OnNext(false);
});

最佳答案

这是非平凡的。幸运的是@DaveSexton 已经完成了所有艰苦的工作。你要BufferIntrospective来自 Rxx 库。 Check out the source here .

这很难的原因是因为IObserver<T>没有内置的方法来发出背压信号——除了 OnXXX 调用阻塞的微妙之处。 Observable需要关注Observer,需要引入并发来管理缓冲。

另请注意,如果您有多个订阅者,他们将获得不同的数据,因为他们收到的数据取决于源事件率和他们的消费率。

另一种方法是将所有事件添加到 OnNext 处理程序中的线程安全队列中,并有一个单独的任务在循环中清空队列。 BufferIntrospective不过可能更干净。

玩了一会儿,这个玩具实现似乎很管用。但是 Rxx 会更健壮,所以这实际上只是教学来展示涉及的是什么类型的东西。关键是通过调度程序引入并发。

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
this IObservable<TSource> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IList<TSource>>(o => {
Subject<Unit> feedback = new Subject<Unit>();
var sourcePub = source.Publish().RefCount();
var sub = sourcePub.Buffer(
() => feedback).ObserveOn(scheduler).Subscribe(@event =>
{
o.OnNext(@event);
feedback.OnNext(Unit.Default);
},
o.OnError,
o.OnCompleted);
var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
return new CompositeDisposable(sub, start);
});
}

此示例代码显示用法以及两个不同节奏的订阅者如何获得不同的事件缓冲,一个接收 5 个批处理,另一个接收 10 个批处理。

我正在使用 LINQPadDump轻松显示每个缓冲区的内容。

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});

关于c# - 处理项目时缓冲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28880247/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com