gpt4 book ai didi

c# - 为什么我的破坏者例子这么慢?

转载 作者:太空宇宙 更新时间:2023-11-03 13:52:24 24 4
gpt4 key购买 nike

我从 Stack Overflow 问题中获取了代码示例 Disruptor.NET example 并将其修改为“测量”时间。完整列表如下:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
public sealed class ValueEntry
{
public long Value { get; set; }

public ValueEntry()
{
Console.WriteLine("New ValueEntry created");
}
}

public class ValueAdditionHandler : IEventHandler<ValueEntry>
{
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
Program.sw.Stop();
long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
Console.WriteLine("elapsed microseconds = " + microseconds);
Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
}
}

class Program
{
public static Stopwatch sw = Stopwatch.StartNew();

private static readonly Random _random = new Random();
private static readonly int _ringSize = 16; // Must be multiple of 2

static void Main(string[] args)
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

disruptor.HandleEventsWith(new ValueAdditionHandler());

var ringBuffer = disruptor.Start();

while (true)
{
var valueToSet = _random.Next();
long sequenceNo = ringBuffer.Next();

ValueEntry entry = ringBuffer[sequenceNo];

entry.Value = valueToSet;

sw.Restart();
ringBuffer.Publish(sequenceNo);

Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

Thread.Sleep(1000);
}
}
}
}

输出是:

New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
Published entry 0, value 1510145842
elapsed microseconds = 2205
Event handled: Value = 1510145842 (processed event 0
Published entry 1, value 1718075893
elapsed microseconds = 85
Event handled: Value = 1718075893 (processed event 1
Published entry 2, value 1675907645
elapsed microseconds = 32
Event handled: Value = 1675907645 (processed event 2
Published entry 3, value 1563009446
elapsed microseconds = 75
Event handled: Value = 1563009446 (processed event 3
Published entry 4, value 1782914062
elapsed microseconds = 34
Event handled: Value = 1782914062 (processed event 4
Published entry 5, value 1516398244
elapsed microseconds = 50
Event handled: Value = 1516398244 (processed event 5
Published entry 6, value 76829327
elapsed microseconds = 50
Event handled: Value = 76829327 (processed event 6

因此将数据从一个线程传递到另一个线程大约需要 50 微秒。但是一点都不快! “当前版本的 Disruptor 可以以每秒 100 万条消息的速度在线程之间执行约 50 ns。”所以我的结果比预期慢 1000 倍。

我的示例有什么问题以及如何实现 50 纳秒的速度?

我修改了上面的程序,现在收到 1 微秒的延迟,这要好得多。但是,我仍在等待 disruptor 模式专家的回应。我正在寻找一个可以证明我实际上可以在 50 纳秒内传递数据的示例。

我还使用 BlockingCollection 编写了相同的测试,平均收到 14 微秒,这证明 Disruptor 更快:

使用 BlockingCollection:

average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433

使用干扰器:

average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065

BlockingCollection 代码:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace DisruptorTest
{
public sealed class ValueEntry
{
public int Value { get; set; }

public ValueEntry()
{
// Console.WriteLine("New ValueEntry created");
}
}

//public class ValueAdditionHandler : IEventHandler<ValueEntry>
//{
// public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
// {

// long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// Program.results[data.Value] = microseconds;
// //Console.WriteLine("elapsed microseconds = " + microseconds);
// //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
// }
//}

class Program
{
public const int length = 10000000;
public static Stopwatch[] sw = new Stopwatch[length];
public static long[] results = new long[length];

static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150);

static void Main(string[] args)
{
for (int i = 0; i < length; i++)
{
sw[i] = Stopwatch.StartNew();
}

// A simple blocking consumer with no cancellation.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted)
{

ValueEntry ve = null;
try
{
ve = dataItems.Take();
long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
results[ve.Value] = microseconds;

//Console.WriteLine("elapsed microseconds = " + microseconds);
//Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
}
catch (InvalidOperationException) { }
}
}, TaskCreationOptions.LongRunning);

for (int i = 0; i < length; i++)
{
var valueToSet = i;

ValueEntry entry = new ValueEntry();
entry.Value = valueToSet;

sw[i].Restart();
dataItems.Add(entry);

//Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
//Thread.Sleep(1000);
}

// Wait until all events are delivered
Thread.Sleep(5000);

long average = 0;
long minimum = 10000000000;
int firstFive = 0;
int fiveToTen = 0;
int tenToThirty = 0;
int moreThenThirty = 0;

// Do not count first 100 items because they could be extremely slow
for (int i = 100; i < length; i++)
{
average += results[i];
if (results[i] < minimum)
{
minimum = results[i];
}
if (results[i] < 5)
{
firstFive++;
}
else if (results[i] < 10)
{
fiveToTen++;
}
else if (results[i] < 30)
{
tenToThirty++;
} else
{
moreThenThirty++;
}
}
average /= (length - 100);
Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
}
}
}

干扰代码:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
public sealed class ValueEntry
{
public int Value { get; set; }

public ValueEntry()
{
// Console.WriteLine("New ValueEntry created");
}
}

public class ValueAdditionHandler : IEventHandler<ValueEntry>
{
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{

long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
Program.results[data.Value] = microseconds;
//Console.WriteLine("elapsed microseconds = " + microseconds);
//Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
}
}

class Program
{
public const int length = 10000000;
public static Stopwatch[] sw = new Stopwatch[length];
public static long[] results = new long[length];

private static readonly Random _random = new Random();
private static readonly int _ringSize = 1024; // Must be multiple of 2

static void Main(string[] args)
{
for (int i = 0; i < length; i++)
{
sw[i] = Stopwatch.StartNew();
}

var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

disruptor.HandleEventsWith(new ValueAdditionHandler());

var ringBuffer = disruptor.Start();

for (int i = 0; i < length; i++)
{
var valueToSet = i;
long sequenceNo = ringBuffer.Next();

ValueEntry entry = ringBuffer[sequenceNo];

entry.Value = valueToSet;

sw[i].Restart();
ringBuffer.Publish(sequenceNo);

//Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

//Thread.Sleep(1000);
}

// wait until all events are delivered
Thread.Sleep(5000);

long average = 0;
long minimum = 10000000000;
int firstFive = 0;
int fiveToTen = 0;
int tenToThirty = 0;
int moreThenThirty = 0;

// Do not count first 100 items because they could be extremely slow
for (int i = 100; i < length; i++)
{
average += results[i];
if (results[i] < minimum)
{
minimum = results[i];
}
if (results[i] < 5)
{
firstFive++;
}
else if (results[i] < 10)
{
fiveToTen++;
}
else if (results[i] < 30)
{
tenToThirty++;
}
else
{
moreThenThirty++;
}
}
average /= (length - 100);
Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
}
}
}

最佳答案

在这里,我修复了你的代码:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
public sealed class ValueEntry
{
public int Value { get; set; }

public ValueEntry()
{
// Console.WriteLine("New ValueEntry created");
}
}

class Program
{
public const int length = 1000000;
public static Stopwatch sw;

private static readonly Random _random = new Random();
private static readonly int _ringSize = 1024; // Must be multiple of 2

static void Main(string[] args)
{
sw = Stopwatch.StartNew();

var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

var ringBuffer = disruptor.Start();

for (int i = 0; i < length; i++)
{
var valueToSet = i;
long sequenceNo = ringBuffer.Next();

ValueEntry entry = ringBuffer[sequenceNo];

entry.Value = valueToSet;

ringBuffer.Publish(sequenceNo);

//Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

//Thread.Sleep(1000);
}

var elapsed = sw.Elapsed.Miliseconds();
// wait until all events are delivered
Thread.Sleep(10000);

double average = /(double)length;
Console.WriteLine("average = " + average);
}
}
}

这应该可以正确测试每件商品需要多长时间。

关于c# - 为什么我的破坏者例子这么慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13334778/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com