gpt4 book ai didi

c# - Reactive Extensions 看起来很慢——我做错了什么吗?

转载 作者:可可西里 更新时间:2023-11-01 08:01:05 26 4
gpt4 key购买 nike

我正在为一个每秒需要处理数千条消息的交易平台项目评估 Rx。现有平台有一个复杂的事件路由系统(多播委托(delegate))响应这些消息并进行大量后续处理。

我查看了 Reactive Extensions 的明显好处,但注意到它有点慢,通常慢 100 倍。

我创建了单元测试来演示这一点,它运行一个简单的增量 100 万次,使用各种 Rx 风格和直接开箱即用的委托(delegate)“控制”测试。

结果如下:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range() - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher - (1000000) - 00:00:03.0360000

如您所见,所有 Rx 方法都比委托(delegate)等效方法慢约 100 倍。显然,Rx 在幕后做了很多工作,这些工作将在更复杂的示例中使用,但这看起来非常慢。

这是正常现象还是我的测试假设无效?上面的 Nunit 代码如下 -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
[TestFixture]
class ReactiveExtensionsBenchmark_Tests
{
private int counter = 0;

[Test]
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;

Action<int> a = (i) => { counter++; };

DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
}

public void ObservableRangeTest(int iterations, Action<int> action)
{
counter = 0;

long start = DateTime.Now.Ticks;

Observable.Range(0, iterations).Subscribe(action);

OutputTestDuration("Observable.Range()", start);
}


public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;

var eventSubject = new Subject<int>();
var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
events.Subscribe(action);

long start = DateTime.Now.Ticks;

Enumerable.Range(0, iterations).ToList().ForEach
(
a => eventSubject.OnNext(1)
);

OutputTestDuration("Subject.Subscribe() - " + mode, start);
}

public void DelegateSmokeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;

Enumerable.Range(0, iterations).ToList().ForEach
(
a => action(1)
);

OutputTestDuration("Delegate", start);
}


/// <summary>
/// Output helper
/// </summary>
/// <param name="test"></param>
/// <param name="duration"></param>
public void OutputTestDuration(string test, long duration)
{
Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
}

/// <summary>
/// Test timing helper
/// </summary>
/// <param name="elapsedTicks"></param>
/// <returns></returns>
public string ElapsedDuration(long elapsedTicks)
{
return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
}

}
}

最佳答案

我的猜测是 Rx 团队首先专注于构建功能,而不关心性能优化。

使用分析器确定瓶颈并用您自己的优化版本替换慢速 Rx 类。

下面是两个例子。

结果:

Delegate                                 - (1000000) - 00:00:00.0368748Simple - NewThread                       - (1000000) - 00:00:00.0207676Simple - CurrentThread                   - (1000000) - 00:00:00.0214599Simple - Immediate                       - (1000000) - 00:00:00.0162026Simple - ThreadPool                      - (1000000) - 00:00:00.0169848FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:

private IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}

测试:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;

var start = Stopwatch.StartNew();

var observable = CreateFastObservable(iterations);

observable.SubscribeOn(scheduler).Run(action);

OutputTestDuration("Simple - " + mode, start);
}

主题增加了很多开销。这是一个主题,它被剥夺了主题的大部分预期功能,但速度很快:

class FastSubject<T> : ISubject<T>
{
private event Action onCompleted;
private event Action<Exception> onError;
private event Action<T> onNext;

public FastSubject()
{
onCompleted += () => { };
onError += error => { };
onNext += value => { };
}

public void OnCompleted()
{
this.onCompleted();
}

public void OnError(Exception error)
{
this.onError(error);
}

public void OnNext(T value)
{
this.onNext(value);
}

public IDisposable Subscribe(IObserver<T> observer)
{
this.onCompleted += observer.OnCompleted;
this.onError += observer.OnError;
this.onNext += observer.OnNext;

return Disposable.Create(() =>
{
this.onCompleted -= observer.OnCompleted;
this.onError -= observer.OnError;
this.onNext -= observer.OnNext;
});
}
}

测试:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;

var start = Stopwatch.StartNew();

var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

observable.SubscribeOn(scheduler).Run(action);

OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}

关于c# - Reactive Extensions 看起来很慢——我做错了什么吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4272354/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com