- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在为一个每秒需要处理数千条消息的交易平台项目评估 Rx。现有平台有一个复杂的事件路由系统(多播委托(delegate))响应这些消息并进行大量后续处理。
我查看了 Reactive Extensions 的明显好处,但注意到它有点慢,通常慢 100 倍。
我创建了单元测试来演示这一点,它运行一个简单的增量 100 万次,使用各种 Rx 风格和直接开箱即用的委托(delegate)“控制”测试。
结果如下:
Delegate - (1000000) - 00:00:00.0410000
Observable.Range() - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher - (1000000) - 00:00:03.0360000
如您所见,所有 Rx 方法都比委托(delegate)等效方法慢约 100 倍。显然,Rx 在幕后做了很多工作,这些工作将在更复杂的示例中使用,但这看起来非常慢。
这是正常现象还是我的测试假设无效?上面的 Nunit 代码如下 -
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;
namespace RxTests
{
[TestFixture]
class ReactiveExtensionsBenchmark_Tests
{
private int counter = 0;
[Test]
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
}
public void ObservableRangeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Observable.Range(0, iterations).Subscribe(action);
OutputTestDuration("Observable.Range()", start);
}
public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var eventSubject = new Subject<int>();
var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
events.Subscribe(action);
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => eventSubject.OnNext(1)
);
OutputTestDuration("Subject.Subscribe() - " + mode, start);
}
public void DelegateSmokeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => action(1)
);
OutputTestDuration("Delegate", start);
}
/// <summary>
/// Output helper
/// </summary>
/// <param name="test"></param>
/// <param name="duration"></param>
public void OutputTestDuration(string test, long duration)
{
Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
}
/// <summary>
/// Test timing helper
/// </summary>
/// <param name="elapsedTicks"></param>
/// <returns></returns>
public string ElapsedDuration(long elapsedTicks)
{
return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
}
}
}
最佳答案
我的猜测是 Rx 团队首先专注于构建功能,而不关心性能优化。
使用分析器确定瓶颈并用您自己的优化版本替换慢速 Rx 类。
下面是两个例子。
结果:
Delegate - (1000000) - 00:00:00.0368748Simple - NewThread - (1000000) - 00:00:00.0207676Simple - CurrentThread - (1000000) - 00:00:00.0214599Simple - Immediate - (1000000) - 00:00:00.0162026Simple - ThreadPool - (1000000) - 00:00:00.0169848FastSubject.Subscribe() - NewThread - (1000000) - 00:00:00.0588149FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842FastSubject.Subscribe() - Immediate - (1000000) - 00:00:00.0513911FastSubject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0529137
First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:
private IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}
测试:
public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = CreateFastObservable(iterations);
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("Simple - " + mode, start);
}
主题增加了很多开销。这是一个主题,它被剥夺了主题的大部分预期功能,但速度很快:
class FastSubject<T> : ISubject<T>
{
private event Action onCompleted;
private event Action<Exception> onError;
private event Action<T> onNext;
public FastSubject()
{
onCompleted += () => { };
onError += error => { };
onNext += value => { };
}
public void OnCompleted()
{
this.onCompleted();
}
public void OnError(Exception error)
{
this.onError(error);
}
public void OnNext(T value)
{
this.onNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
this.onCompleted += observer.OnCompleted;
this.onError += observer.OnError;
this.onNext += observer.OnNext;
return Disposable.Create(() =>
{
this.onCompleted -= observer.OnCompleted;
this.onError -= observer.OnError;
this.onNext -= observer.OnNext;
});
}
}
测试:
public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
关于c# - Reactive Extensions 看起来很慢——我做错了什么吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4272354/
我正在尝试执行 vagrant up 但一直遇到此错误: ==> default: IOError: [Errno 13] Permission denied: '/usr/local/lib/pyt
我在容器 div 中有一系列动态创建的不同高度的 div。 Varying text... Varying text... Varying text... Varying text.
通过 cygwin 运行 vagrant up 时遇到以下错误。 stderr: /bin/bash: /home/vagrant/.ansible/tmp/ansible-tmp-14872260
今天要向小伙伴们介绍的是一个能够快速地把数据制作成可视化、交互页面的 Python 框架:Streamlit,分分钟让你的数据动起来! 犹记得我在做机器学习和数据分析方面的毕设时,
我是 vagrant 的新手,正在尝试将第二个磁盘添加到我正在用 vagrant 制作的虚拟机中。 我想出了如何在第一次启动虚拟机时连接磁盘,但是当我关闭机器时 然后再次备份(使用 'vagrant
我是一名优秀的程序员,十分优秀!