gpt4 book ai didi

c# - 线程等待时继续处理

转载 作者:行者123 更新时间:2023-11-30 23:08:38 26 4
gpt4 key购买 nike

我正在执行网络请求以获取消息,然后等待该消息的处理,然后再次重复整个过程。

消息的处理将长时间运行,并且线程可能处于等待状态,这可能允许它在其他地方使用。我想要的是继续 while 循环,获取更多消息并在线程空闲时处理它们。

当前同步代码:

while(!cancellationToken.IsCancelled) {
var message = await GetMessage();

await ProcessMessage(message); // I'll need it to continue from here if thread is released.
}

这个用到的场景是一个消息队列Consumer服务。

最佳答案

考虑到 async/await 的使用,您当前的代码不一定是 synchronous(在线程术语中 - 可以调用延续在不同的线程上),尽管获取消息和处理消息之间的依赖性显然必须得到支持。

Re: the thread may be in a waiting state that may allow it to be used elsewhere

等待编码良好的 I/O 绑定(bind)工作根本不需要消耗线程 - 参见 Stephen Cleary's There is no thread .假设这两个等待的任务是 IO 绑定(bind)的,您的代码在等待 IO 绑定(bind)工作时可能根本不会消耗任何线程,即您的应用程序的其余部分将使用线程池。因此,如果您唯一关心的是浪费线程,那么就不需要更多了。

但是,如果您关心的是性能和额外的吞吐量,如果下游有能力对 ProcessMessage 进行并发调用(例如,多个下游 Web 服务器或额外的数据库容量),那么您可以考虑并行化IO 绑定(bind)工作(同样,不需要更多线程池线程)

例如,如果您能够重写 GetMessages 调用以一次检索一个批处理,您可以试试这个:

var messages = await GetMessages(10);
var processTasks = messages
.Select(message => ProcessMessage(message));
await Task.WhenAll(processTasks);

(如果您不能触摸代码,您可以循环 GetMessages 以在 Task.WhenAll 之前检索 10 条单独的消息)

但是,如果您没有任何进一步的能力来执行并发 ProcessMessage 调用,那么您应该着眼于解决瓶颈 - 例如添加更多服务器、优化代码或并行化在 ProcessMessage 工作中完成的工作等。

理由是,正如您所说,GetMessages 从队列中检索数据。如果您没有能力处理您检索到的消息,那么您所能做的就是在其他地方对消息进行排队,这似乎毫无意义 - 而是将消息留在队列中,直到您准备好处理它们。队列深度还将创建积压工作的可见性,您可以对其进行监控。

编辑,回复:偶尔一个 ProcessMessage() 调用比其他调用花费的时间长得多

根据评论,OP 有更多信息表明偶尔的 ProcessMessage 调用比其他调用花费的时间长得多,并且希望在此期间继续处理其他消息。

一种方法是使用此 clever pattern here 将超时应用于并行任务,如果达到,将使任何长时间运行的 ProcessTasks 继续运行,并将继续处理下一批消息。

以下是潜在的危险,因为它需要仔细平衡超时(低于 1000 毫秒)与观察到的行为不当的 ProcessMessage 调用频率 - 如果超时与“慢”ProcessMessages,下游资源可能变得不堪重负。

更安全(但更复杂)的添加是通过 Task.IsCompleted 跟踪未完成的 ProcessMessage 任务的并发数,如果达到阈值,则等待完成足够多的这些任务以使积压工作达到安全水平。

while(!cancellationToken.IsCancelled) 
{
// Ideally, the async operations should all accept cancellationTokens too
var message = await GetMessages(10, cancellationToken);
var processTasks = messages
.Select(message => ProcessMessage(message, cancellationToken));
await Task.WhenAny(Task.WhenAll(processTasks),
Task.Delay(1000, cancellationToken));
}

回复:下游负载安全水平的节流 - TPL DataFlow更有可能在这里使用。

关于c# - 线程等待时继续处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46386834/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com