gpt4 book ai didi

c# - TPL 数据流 block

转载 作者:行者123 更新时间:2023-11-30 22:04:08 31 4
gpt4 key购买 nike

问题:为什么使用 WriteOnceBlock (或 BufferBlock )用于从另一个 BufferBlock<Action> 取回答案(类似回调) (取回答案发生在发布的 Action 中)导致死锁(在此代码中)?

我认为类中的方法可以被视为我们发送给对象的消息(就像 - 我认为 - Alan Kay 提出的有关 OOP 的原始观点)。所以我写了这个通用的 Actor有助于将普通对象转换为 Actor 的类(当然,由于可变性和其他因素,这里有很多看不见的漏洞,但这不是这里的主要问题)。

所以我们有这些定义:

public class Actor<T>
{
private readonly T _processor;
private readonly BufferBlock<Action<T>> _messageBox = new BufferBlock<Action<T>>();

public Actor(T processor)
{
_processor = processor;
Run();
}

public event Action<T> Send
{
add { _messageBox.Post(value); }
remove { }
}

private async void Run()
{
while (true)
{
var action = await _messageBox.ReceiveAsync();
action(_processor);
}
}
}

public interface IIdGenerator
{
long Next();
}

现在;为什么这段代码有效:

static void Main(string[] args)
{
var idGenerator1 = new IdInt64();

var idServer1 = new Actor<IIdGenerator>(idGenerator1);

const int n = 1000;
for (var i = 0; i < n; i++)
{
var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);

Action<IIdGenerator> action = x =>
{
var buffer = x.Next();

answer.Post(buffer);
};

idServer1.Send += action;

Trace.WriteLine(answer.Receive());
}, TaskCreationOptions.LongRunning); // Runs on a separate new thread
t.Start();
}

Console.WriteLine("press any key you like! :)");
Console.ReadKey();

Trace.Flush();
}

并且此代码不起作用:

static void Main(string[] args)
{
var idGenerator1 = new IdInt64();

var idServer1 = new Actor<IIdGenerator>(idGenerator1);

const int n = 1000;
for (var i = 0; i < n; i++)
{
var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);

Action<IIdGenerator> action = x =>
{
var buffer = x.Next();

answer.Post(buffer);
};

idServer1.Send += action;

Trace.WriteLine(answer.Receive());
}, TaskCreationOptions.PreferFairness); // Runs and is managed by Task Scheduler
t.Start();
}

Console.WriteLine("press any key you like! :)");
Console.ReadKey();

Trace.Flush();
}

不同 TaskCreationOptions用于创建 Task秒。也许我在这里对 TPL 数据流概念的理解是错误的,只是开始使用它([ThreadStatic] 隐藏在某处?)。

最佳答案

您的代码存在问题的是这部分:answer.Receive()。当您将它移动到 Action 中时,不会发生死锁:

var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
Trace.WriteLine(answer.Receive());
};
idServer1.Send += action;
});
t.Start();

那是为什么呢? answer.Receive();await answer.ReceiveAsnyc(); 不同,它会阻塞线程,直到返回答案。当您使用 TaskCreationOptions.LongRunning 时,每个任务都有自己的线程,所以没有问题,但没有它(TaskCreationOptions.PreferFairness 无关紧要)所有线程池线程都很忙等待,所以一切都慢得多。它实际上并没有死锁,正如您在使用 15 而不是 1000 时看到的那样。

还有其他解决方案有助于理解问题:

  • 在原始代码之前用ThreadPool.SetMinThreads(1000, 0);增加线程池。
  • 使用 ReceiveAsnyc:

Task.Run(async () =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
};
idServer1.Send += action;
Trace.WriteLine(await answer.ReceiveAsync());
});

关于c# - TPL 数据流 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25432979/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com