- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个定期触发的事件。让我们假设处理事件需要大约 1 秒。我不想为每个接收到的事件等待 1 秒,而是想累积事件 直到最后的处理完成。处理完成后,我想处理上次处理期间收到的事件数据:
e1 e2 e3 e4 e5 e6 e7 events happening
---------------------------------------------------------------------------------------------------------------------------------------------------> time
1s 2s 3s 4s 5s 6s
p(e1) p(e2, e3) p(e4) p(e5, e6) p(e7)
[-----------------------][-----------------------] [-----------------------][-----------------------][-----------------------] processing of items
In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 -
is finished the processing of the events e2 and e3 takes place.
This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been
checked in during the build will then be processed.
我应该如何使用 Rx 做到这一点?
我试过将 Buffer 与打开和关闭选择器结合使用,但我无法正确使用。感谢任何示例或指导!
让我们假设一个 Subject<int>
作为输入流。
我试过类似的东西,但我完全迷路了。
var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
bc.OnNext(true);
String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
Thread.Sleep(300);
bc.OnNext(false);
});
最佳答案
这是非平凡的。幸运的是@DaveSexton 已经完成了所有艰苦的工作。你要BufferIntrospective
来自 Rxx 库。 Check out the source here .
这很难的原因是因为IObserver<T>
没有内置的方法来发出背压信号——除了 OnXXX 调用阻塞的微妙之处。 Observable需要关注Observer,需要引入并发来管理缓冲。
另请注意,如果您有多个订阅者,他们将获得不同的数据,因为他们收到的数据取决于源事件率和他们的消费率。
另一种方法是将所有事件添加到 OnNext 处理程序中的线程安全队列中,并有一个单独的任务在循环中清空队列。 BufferIntrospective
不过可能更干净。
玩了一会儿,这个玩具实现似乎很管用。但是 Rxx 会更健壮,所以这实际上只是教学来展示涉及的是什么类型的东西。关键是通过调度程序引入并发。
public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
this IObservable<TSource> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IList<TSource>>(o => {
Subject<Unit> feedback = new Subject<Unit>();
var sourcePub = source.Publish().RefCount();
var sub = sourcePub.Buffer(
() => feedback).ObserveOn(scheduler).Subscribe(@event =>
{
o.OnNext(@event);
feedback.OnNext(Unit.Default);
},
o.OnError,
o.OnCompleted);
var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
return new CompositeDisposable(sub, start);
});
}
此示例代码显示用法以及两个不同节奏的订阅者如何获得不同的事件缓冲,一个接收 5 个批处理,另一个接收 10 个批处理。
我正在使用 LINQPad的 Dump
轻松显示每个缓冲区的内容。
var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);
var buffered = xs.BufferIntrospective();
buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});
buffered.Subscribe(x => {
x.Dump();
Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});
关于c# - 处理项目时缓冲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28880247/
SQLite、Content provider 和 Shared Preference 之间的所有已知区别。 但我想知道什么时候需要根据情况使用 SQLite 或 Content Provider 或
警告:我正在使用一个我无法完全控制的后端,所以我正在努力解决 Backbone 中的一些注意事项,这些注意事项可能在其他地方更好地解决......不幸的是,我别无选择,只能在这里处理它们! 所以,我的
我一整天都在挣扎。我的预输入搜索表达式与远程 json 数据完美配合。但是当我尝试使用相同的 json 数据作为预取数据时,建议为空。点击第一个标志后,我收到预定义消息“无法找到任何内容...”,结果
我正在制作一个模拟 NHL 选秀彩票的程序,其中屏幕右侧应该有一个 JTextField,并且在左侧绘制弹跳的选秀球。我创建了一个名为 Ball 的类,它实现了 Runnable,并在我的主 Draf
这个问题已经有答案了: How can I calculate a time span in Java and format the output? (18 个回答) 已关闭 9 年前。 这是我的代码
我有一个 ASP.NET Web API 应用程序在我的本地 IIS 实例上运行。 Web 应用程序配置有 CORS。我调用的 Web API 方法类似于: [POST("/API/{foo}/{ba
我将用户输入的时间和日期作为: DatePicker dp = (DatePicker) findViewById(R.id.datePicker); TimePicker tp = (TimePic
放宽“邻居”的标准是否足够,或者是否有其他标准行动可以采取? 最佳答案 如果所有相邻解决方案都是 Tabu,则听起来您的 Tabu 列表的大小太长或您的释放策略太严格。一个好的 Tabu 列表长度是
我正在阅读来自 cppreference 的代码示例: #include #include #include #include template void print_queue(T& q)
我快疯了,我试图理解工具提示的行为,但没有成功。 1. 第一个问题是当我尝试通过插件(按钮 1)在点击事件中使用它时 -> 如果您转到 Fiddle,您会在“内容”内看到该函数' 每次点击都会调用该属
我在功能组件中有以下代码: const [ folder, setFolder ] = useState([]); const folderData = useContext(FolderContex
我在使用预签名网址和 AFNetworking 3.0 从 S3 获取图像时遇到问题。我可以使用 NSMutableURLRequest 和 NSURLSession 获取图像,但是当我使用 AFHT
我正在使用 Oracle ojdbc 12 和 Java 8 处理 Oracle UCP 管理器的问题。当 UCP 池启动失败时,我希望关闭它创建的连接。 当池初始化期间遇到 ORA-02391:超过
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 9 年前。 Improve
引用这个plunker: https://plnkr.co/edit/GWsbdDWVvBYNMqyxzlLY?p=preview 我在 styles.css 文件和 src/app.ts 文件中指定
为什么我的条形这么细?我尝试将宽度设置为 1,它们变得非常厚。我不知道还能尝试什么。默认厚度为 0.8,这是应该的样子吗? import matplotlib.pyplot as plt import
当我编写时,查询按预期执行: SELECT id, day2.count - day1.count AS diff FROM day1 NATURAL JOIN day2; 但我真正想要的是右连接。当
我有以下时间数据: 0 08/01/16 13:07:46,335437 1 18/02/16 08:40:40,565575 2 14/01/16 22:2
一些背景知识 -我的 NodeJS 服务器在端口 3001 上运行,我的 React 应用程序在端口 3000 上运行。我在 React 应用程序 package.json 中设置了一个代理来代理对端
我面临着一个愚蠢的问题。我试图在我的 Angular 应用程序中延迟加载我的图像,我已经尝试过这个2: 但是他们都设置了 src attr 而不是 data-src,我在这里遗漏了什么吗?保留 d
我是一名优秀的程序员,十分优秀!