- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我想使用 Rx 来计算 2 个事件流的统计信息。
输入流
// stream1 --A---B----A-B-----A-----B----A--B|
// stream2 ----X---X-----------X--X---XX---X--X|
中间结果
窗口持续时间,其中窗口在 A 上打开并在 B 上关闭以及在这些窗口内引发的 stream2 事件的计数
// result ------1------0-----------2-------1| <-- count of stream2 events in [A-B] window
// 4 2 6 3 <-- paired with window [A-B] window duration
最终结果
按 stream2 事件的计数对中间结果进行分组,并返回每组的窗口持续时间统计信息,例如平均、最小和最大窗口持续时间
// output -----------------------------------0 1 2| <-- count of stream2 events in [A-B] window
// 2 3.5 6 <-- average [A-B] window duration for that count of stream2 events.
Rx 查询
public enum EventKind
{
START,
STOP,
OTHER
};
public struct Event1
{
public EventKind Kind;
public DateTime OccurenceTime;
};
var merge = stream1.Merge(stream2.Select(x => new Event1
{
Kind = EventKind.OTHER,
OccurenceTime = x
}))
.RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));
var shared = merge.Publish().RefCount();
// Windows open on START and close on STOP
var windows = shared.Window(
shared.Where(x => x.Kind == EventKind.START),
opening => shared.Where(x => x.Kind == EventKind.STOP));
// For each window we're interested in the duration of the window along with
// the count of OTHER events that were raised inside the window
//
var pairs = windows.Select(window => new
{
Duration = window
.Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
.Buffer(2,1) // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
.Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
.Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency
EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
}
);
我想简化可观察类型
IObservable<{IObservable<int>, IObservable<TimeSpan>}>
IObservable<{int, TimeSpan}>
这应该是可能的,因为每个窗口都有恰好 1 个持续时间和 1 个其他事件计数。
在这一点上,定义按 EventCount
对窗口进行分组的输出查询应该不会太困难。并选择窗口持续时间的统计信息,例如每组的 Min、Max、Avg。
var result = pairs
.GroupBy(pair => pair.EventCount)
.Select(g => new
{
EventCount = g.Key,
Min = g.Min(x => x.Duration),
Avg = g.Average(x => x.Duration),
Max = g.Max(x => x.Duration)
});
RemoveDisorder
是我用来对 OccurenceTime
上的合并观察结果进行排序的扩展方法.我需要它,因为我的输入流不是实时事件(如本例中所示),而是通过 Tx 从日志中读取。合并 2 个排序流的输出本身不再排序。
最佳答案
在使用 Rx 一段时间后,您可能遇到的一个常见场景是关于启动和停止事件。要正确处理它,有多种方法,具体取决于您的要求。
如果您的问题只是数据投影,请检查@Brandon 解决方案,关键是以不同的方式组合,例如使用 SelectMany
.如果你想保留 Select
运营商有必要返回一个 IObservable<T>
输入投影。
无论如何,我认为你的作文大体上有问题,我将在下面尝试说明。
使用 Window
运算符,就像您所做的那样,如果在开始流中发生多个连续事件,它将创建多个组。在您的代码中可能会出现问题,因为主事件流将在下一个事件发生时处理多次。
这个例子只是为了向您展示许多组的创建:
var subject = new Subject<Event1>();
var shared = subject.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
var values = shared.Where(a => a.Kind == EventKind.OTHER);
values.Window(start, a => stop).Subscribe(inner =>
{
Console.WriteLine("New Group Started");
inner.Subscribe(next =>
{
Console.WriteLine("Next = "+ next.Kind + " | " + next.OccurenceTime.ToLongTimeString());
}, () => Console.WriteLine("Group Completed"));
});
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now });
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now.AddSeconds(1) });
subject.OnNext(new Event1 { Kind = EventKind.OTHER, OccurenceTime = DateTime.Now.AddSeconds(2) });
subject.OnNext(new Event1 { Kind = EventKind.STOP, OccurenceTime = DateTime.Now.AddSeconds(3) });
结果:
New Group Started
New Group Started
Next = OTHER | 4:55:46 PM
Next = OTHER | 4:55:46 PM
Group Completed
Group Completed
也许需要这种行为,否则就需要其他组合。为了“驯服”事件流,我看到了三种不同的方法:
Switch
运算符)。要实现这些选项中的一个,一般来说,您可以使用多种不同的方法来实现。如果我理解你的问题,你正在寻找选项ONE。现在答案:
Window
, 太多代码:IObservable<Event1> sx= GetEventStream();
var shared = sx.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
shared.Window(start, a => stop)
.Select(sx =>
sx.Publish(b =>
b.Take(1)
.Select(c =>
{
var final = b.LastOrDefaultAsync().Select(a => a.OccurenceTime);
var comp = b.Where(d => d.Kind == EventKind.OTHER).Count();
return final.Zip(comp, (d,e) => new { Count = e, Time = d - c.OccurenceTime });
})
.Switch() // whatever operator here there's no difference
) // because is just 1
)
.Concat()
.Subscribe(next =>
{
Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
});
使用 GroupByUntil
,一种“hack”,但这是我的偏好:
IObservable<Event1> sx = GetEventStream();
var shared = sx.Publish().RefCount();
var stop = shared.Where(a => a.Kind == EventKind.STOP).Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
start.GroupByUntil(a => Unit.Default, a => stop)
.Select(newGroup =>
{
var creation = newGroup.Take(1);
var rightStream = shared.Where(a => a.Kind == EventKind.OTHER)
.TakeUntil(newGroup.LastOrDefaultAsync())
.Count();
var finalStream = stop.Take(1);
return creation.Zip(rightStream, finalStream, (a,b,c) => new { Count = b, Time = c.OccurenceTime - a.OccurenceTime });
})
.Concat()
.Subscribe(next =>
{
Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
});
不使用 Group/Window
与 Take(1)
在作文的最后添加 Repeat
运算符,但可能会导致意外行为,因为“重新订阅”(将取决于它是冷还是热可观察对象,以及使用的调度程序)。
创建一个声明您自己的扩展方法的自定义实现,并不像看起来那么难,可能是最好的选择,但需要一段时间才能实现。
您的作文的另一个问题是无法获得统计数据,因为您没有办法完成 GroupBy
中的每个新组。运营商。
我建议重新考虑您的方法,解决方案可能会以某种方式合并时间。有关统计信息和 Rx 的更多信息,请查看: http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App
关于c# - 接收 : Pairing window duration with count of events raised inside the window,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28331039/
我正在尝试使用普通的 Windows Metro 风格应用程序执行以下操作: public class MyButton : Button { public Duration Duration
有下表 "CREATE TABLE IF NOT EXISTS user_preferences (" + " user_id text,"
假设我有: t := 10 * time.Second // 10s, 当 time.Second 再次应用时,幕后发生了什么? tt := t * time.Second // -2346317h4
在 Java 8 中, Duration 类(class)提供了 toDays 方法,返回总天数作为与日历天无关的 24 小时时间块的计数。 在 Java 9 中,Duration上课得心应手 to…
这很奇怪,经过数小时的测试我仍然无法弄清楚。 好的,这就是我要做的:合并两个视频,也就是一个接一个地追加。 我拍摄了两个视频,然后有两个网址。然后我使用以下方法创建了两个 AVURLAsset: A
正如标题所说,我得到一个字符串'01:23.290',它看起来像一个Duration,但不是。现在我需要用它来与真实的 Duration 进行比较,而我不知道如何处理它。有什么方法吗? 最佳答案 使用
std::chrono::duration的默认构造函数定义如下: constexpr duration() = default; (例如,参见 cppreference.com 或 libstdc+
我正在尝试在我的应用程序中录制视频,我注意到在显示它们的duration 时,我看到了错误的分钟\秒。只有通过以下代码录制的视频才会发生这种情况。通过其他应用录制的视频,时长显示在右侧: publi
我对 AVPlayer.timeControlStatus 属性进行 KVO 处理,播放器有一个 AVPlayerItem。 该视频是托管在远程服务器上的 mpeg4 编码的 10 秒视频文件: le
我正在使用 Spring Reactor Core 3.0.6 并且我有一个返回 Flux 的方法: public Flux createFlux(){ return Flux.,String
当然,我在这里做了一些愚蠢的事情,但我在编译我的简单秒表类时遇到了问题。错误是: /usr/include/c++/4.9/chrono:246:2: error: static assertion
此代码在 gcc-4.8 上编译但在 clang-3.3 上失败?以及如何使这段代码可以在 clang 上编译? =\ #include #include #include void sleep
我正在尝试编写一个允许用户指定 chrono::duration 的函数,例如 chrono::seconds 并返回 chrono 的结果::duration::count. 我可以使用以下模板函数
我不明白在 Go 中划分一个 time.Duration 是什么意思。 例如,这是 super 可爱的: d,_ := time.ParseDuration("4s") fmt.Println(d/4
我是一个初学者程序员,在 scala 中有一个非常简单的问题,我想将一个 long var 转换为 Duration (import scala.concurrent.duration.Duratio
我想知道这两者之间的区别是什么 boost::timed_mutex _mutex; if(_mutex.timed_lock(boost::get_system_time() + boost::po
我正在尝试实现 JUnit 测试来测试参与者。 我有这个 ActorTest : import org.junit.Test; import play.libs.Akka; import playte
假设您有一个涉及两个 Web 应用程序的项目(将共享 DAL/DAO/BO 程序集和一些 OSS 库): 一个半复杂的管理应用程序,使用 Windows Live ID 进行身份验证,并且还能够与各种
根据 moment.js documentation您可以创建 moment 的本地实例,以使用全局设置以外的其他语言环境来格式化日期。 使用 format() 效果很好,但我如何在本地实例上使用 d
我的项目提示“调用中有一个额外的参数‘duration’”。这是它发生的地方 required init(coder aDecoder: NSCoder) { workout = Workou
我是一名优秀的程序员,十分优秀!