- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们有一个消息队列。我们希望并行处理消息并限制同时处理的消息数。
我们下面的试验代码确实会并行处理消息,但它只会在前一个处理完成后才开始新的一批处理。我们希望在任务完成时重新启动任务。
换句话说:只要消息队列不为空,最大数量的任务就应该始终处于事件状态。
static string queue = @".\Private$\concurrenttest";
private static void Process(CancellationToken token)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
IEnumerable<Task> consumerTasks = ConsumerTasks();
await Task.WhenAll(consumerTasks);
await PeekAsync(new MessageQueue(queue));
}
});
}
private static IEnumerable<Task> ConsumerTasks()
{
for (int i = 0; i < 15; i++)
{
Command1 message;
try
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
Message msg = msMq.Receive();
message = (Command1)msg.Body;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
yield break; // nothing in queue
else throw;
}
yield return Task.Run(() =>
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
});
}
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
最佳答案
编辑
我花了很多时间考虑泵的可靠性 - 特别是如果从 MessageQueue
接收到消息,取消就变得很棘手 - 所以我提供了两种终止队列的方法:
CancellationToken
发出信号会尽快停止管道,这可能会导致消息丢失。MessagePump.Stop()
终止泵,但允许在 MessagePump.Completion
任务转换到RanToCompletion
。该解决方案使用 TPL 数据流(NuGet:Microsoft.Tpl.Dataflow)。
完整实现:
using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace StackOverflow.Q34437298
{
/// <summary>
/// Pumps the message queue and processes messages in parallel.
/// </summary>
public sealed class MessagePump
{
/// <summary>
/// Creates a <see cref="MessagePump"/> and immediately starts pumping.
/// </summary>
public static MessagePump Run(
MessageQueue messageQueue,
Func<Message, Task> processMessage,
int maxDegreeOfParallelism,
CancellationToken ct = default(CancellationToken))
{
if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue));
if (processMessage == null) throw new ArgumentNullException(nameof(processMessage));
if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
ct.ThrowIfCancellationRequested();
return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct);
}
private readonly TaskCompletionSource<bool> _stop = new TaskCompletionSource<bool>();
/// <summary>
/// <see cref="Task"/> which completes when this instance
/// stops due to a <see cref="Stop"/> or cancellation request.
/// </summary>
public Task Completion { get; }
/// <summary>
/// Maximum number of parallel message processors.
/// </summary>
public int MaxDegreeOfParallelism { get; }
/// <summary>
/// <see cref="MessageQueue"/> that is pumped by this instance.
/// </summary>
public MessageQueue MessageQueue { get; }
/// <summary>
/// Creates a new <see cref="MessagePump"/> instance.
/// </summary>
private MessagePump(MessageQueue messageQueue, Func<Message, Task> processMessage, int maxDegreeOfParallelism, CancellationToken ct)
{
MessageQueue = messageQueue;
MaxDegreeOfParallelism = maxDegreeOfParallelism;
// Kick off the loop.
Completion = RunAsync(processMessage, ct);
}
/// <summary>
/// Soft-terminates the pump so that no more messages will be pumped.
/// Any messages already removed from the message queue will be
/// processed before this instance fully completes.
/// </summary>
public void Stop()
{
// Multiple calls to Stop are fine.
_stop.TrySetResult(true);
}
/// <summary>
/// Pump implementation.
/// </summary>
private async Task RunAsync(Func<Message, Task> processMessage, CancellationToken ct = default(CancellationToken))
{
using (CancellationTokenSource producerCTS = ct.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(ct)
: new CancellationTokenSource())
{
// This CancellationToken will either be signaled
// externally, or if our consumer errors.
ct = producerCTS.Token;
// Handover between producer and consumer.
DataflowBlockOptions bufferOptions = new DataflowBlockOptions {
// There is no point in dequeuing more messages than we can process,
// so we'll throttle the producer by limiting the buffer capacity.
BoundedCapacity = MaxDegreeOfParallelism,
CancellationToken = ct
};
BufferBlock<Message> buffer = new BufferBlock<Message>(bufferOptions);
Task producer = Task.Run(async () =>
{
try
{
while (_stop.Task.Status != TaskStatus.RanToCompletion)
{
// This line and next line are the *only* two cancellation
// points which will not cause dropped messages.
ct.ThrowIfCancellationRequested();
Task<Message> peekTask = WithCancellation(PeekAsync(MessageQueue), ct);
if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task)
{
// Stop was signaled before PeekAsync returned. Wind down the producer gracefully
// by breaking out and propagating completion to the consumer blocks.
break;
}
await peekTask.ConfigureAwait(false); // Observe Peek exceptions.
ct.ThrowIfCancellationRequested();
// Zero timeout means that we will error if someone else snatches the
// peeked message from the queue before we get to it (due to a race).
// I deemed this better than getting stuck waiting for a message which
// may never arrive, or, worse yet, let this ReceiveAsync run onobserved
// due to a cancellation (if we choose to abandon it like we do PeekAsync).
// You will have to restart the pump if this throws.
// Omit timeout if this behaviour is undesired.
Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false);
await buffer.SendAsync(message, ct).ConfigureAwait(false);
}
}
finally
{
buffer.Complete();
}
},
ct);
// Wire up the parallel consumers.
ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions {
CancellationToken = ct,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
SingleProducerConstrained = true, // We don't require thread safety guarantees.
BoundedCapacity = MaxDegreeOfParallelism,
};
ActionBlock<Message> consumer = new ActionBlock<Message>(async message =>
{
ct.ThrowIfCancellationRequested();
await processMessage(message).ConfigureAwait(false);
},
executionOptions);
buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion)
{
// If we got here, consumer probably errored. Stop the producer
// before we throw so we don't go dequeuing more messages.
producerCTS.Cancel();
}
// Task.WhenAll checks faulted tasks before checking any
// canceled tasks, so if our consumer threw a legitimate
// execption, that's what will be rethrown, not the OCE.
await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false);
}
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndPeek.
/// </summary>
private static Task<Message> PeekAsync(MessageQueue messageQueue)
{
return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek);
}
/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndReceive.
/// </summary>
private static Task<Message> ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout)
{
return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek);
}
/// <summary>
/// Allows abandoning tasks which do not natively
/// support cancellation. Use with caution.
/// </summary>
private static async Task<T> WithCancellation<T>(Task<T> task, CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
using (ct.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs, false))
{
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
// Cancellation task completed first.
// We are abandoning the original task.
throw new OperationCanceledException(ct);
}
}
// Task completed: synchronously return result or propagate exceptions.
return await task.ConfigureAwait(false);
}
}
}
用法:
using (MessageQueue msMq = GetQueue())
{
MessagePump pump = MessagePump.Run(
msMq,
async message =>
{
await Task.Delay(50);
Console.WriteLine($"Finished processing message {message.Id}");
},
maxDegreeOfParallelism: 4
);
for (int i = 0; i < 100; i++)
{
msMq.Send(new Message());
Thread.Sleep(25);
}
pump.Stop();
await pump.Completion;
}
不整洁但功能齐全的单元测试:
https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da
原始答案
正如我在评论中提到的,.NET 中已建立生产者/消费者模式,其中之一是管道。在 Microsoft 自己的 Stephen Toub 的“并行编程模式”中可以找到一个很好的例子(全文在这里:https://www.microsoft.com/en-au/download/details.aspx?id=19222,第 55 页)。
这个想法很简单:生产者不断地将东西放入队列中,消费者将其拉出并处理(与生产者并行,也可能彼此并行)。
这是一个消息管道示例,其中消费者使用同步、阻塞方法在项目到达时对其进行处理(我已将消费者并行化以适合您的场景):
void MessageQueueWithBlockingCollection()
{
// If your processing is continuous and never stops throughout the lifetime of
// your application, you can ignore the fact that BlockingCollection is IDisposable.
using (BlockingCollection<Message> messages = new BlockingCollection<Message>())
{
Task producer = Task.Run(() =>
{
try
{
for (int i = 0; i < 10; i++)
{
// Hand over the message to the consumer.
messages.Add(new Message());
// Simulated arrival delay for the next message.
Thread.Sleep(10);
}
}
finally
{
// Notify consumer that there is no more data.
messages.CompleteAdding();
}
});
Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = 4
};
Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => {
ProcessMessage(message);
});
});
Task.WaitAll(producer, consumer);
}
}
void ProcessMessage(Message message)
{
Thread.Sleep(40);
}
上面的代码在大约 130-140 毫秒内完成,这正是您对消费者并行化的预期。
现在,在您的场景中,您正在使用更适合 TPL 数据流的 Task
和 async
/await
(Microsoft 官方支持的库专为并行和异步序列处理)。
这是一个小演示,展示了您将用于该作业的不同类型的 TPL 数据流处理 block :
async Task MessageQueueWithTPLDataflow()
{
// Set up our queue.
BufferBlock<Message> queue = new BufferBlock<Message>();
// Set up our processing stage (consumer).
ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions {
CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation.
MaxDegreeOfParallelism = 4
};
ActionBlock<Message> consumer = new ActionBlock<Message>(m => ProcessMessageAsync(m), options);
// Link the queue to the consumer.
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
// Wire up our producer.
Task producer = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
queue.Post(new Message());
await Task.Delay(10).ConfigureAwait(false);
}
}
finally
{
// Signal to the consumer that there are no more items.
queue.Complete();
}
});
await consumer.Completion.ConfigureAwait(false);
}
Task ProcessMessageAsync(Message message)
{
return Task.Delay(40);
}
调整以上内容以使用您的 MessageQueue
并不难,您可以确信最终结果不会出现线程问题。如果我今天/明天有更多时间,我会这样做。
关于c# - 如何并行读取队列中的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34437298/
我遇到一种情况,我需要从某个主题读取(正在进行的)消息并将它们放入另一个 Queue 中。我怀疑我是否需要 jms Queue 或者我可以对内存中的 java Queue 感到满意。我将通过同一 jv
队列也是一种操作受限的线性数据结构,与栈很相似。 01、定义 栈的操作受限表现为只允许在队列的一端进行元素插入操作,在队列的另一端只允许删除操作。这一特性可以总结为先进先出(First In
队列的定义 队列(Queue):先进先出的线性表 队列是仅在队尾进行插入和队头进行删除操作的线性表 队头(front):线性表的表头端,即可删除端 队尾(rear):线性表的表尾端,即可插入端 由于这
Redis专题-队列 首先,想一想 Redis 适合做消息队列吗? 1、消息队列的消息存取需求是什么?redis中的解决方案是什么? 无非就是下面这几点: 0、数据可以顺序读
0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有着巨大的不
我想在 redis + Flask 和 Python 中实现一个队列。我已经用 RQ 实现了这样的查询,如果你有 Flask 应用程序和任务在同一台服务器上工作,它就可以正常工作。我想知道是否有可能创
我正在使用 Laravel 5.1,我有一个大约需要 2 分钟来处理的任务,这个任务特别是生成报告...... 现在,很明显,我不能让用户在我接受用户输入的同一页面上等待 2 分钟,而是我应该在后台处
我正在使用 Azure 队列,并且有多个不同的进程从队列中读取数据。 我的系统的构建方式假设每条消息只读取一次。 这个Microsoft article声称 Azure 队列具有至少一次传送保证,这可
我正在创建一个Thread::Queue元素数组。 我这样做是这样的: for (my $i=0; $i new; } 但是,当我在每个队列中填充这样的元素时 $queues[$index]->enq
我试图了解如何将我的 Mercurial 补丁推送到远程存储库(例如 bitbucket.org),而不必先应用它们(实际上提交它们)。我的动机是在最终完成之前首先对我的工作进行远程备份,并且能够与其
我的本地计算机上有一个 Mercurial 队列补丁,我需要与同事共享该补丁,但我不想将其提交到上游存储库。有没有一种简单的方法可以打包该补丁并与他分享? 最佳答案 mq 将补丁作为不带扩展名的文
Java 中是否有任何类提供与 Queue 相同的功能,但有返回对象的选项,并且不要删除它,只需将其设置在集合末尾? 最佳答案 Queue不直接提供这样的方法。但是,您可以使用 poll 和 add
我在Windows上使用Tortoise svn客户端,我需要能够一次提交来自不同子文件夹的更改文件-一次提交。像在提交之前将文件添加到队列中之类的?我该怎么做? Windows上是否还有另一个svn
好吧,我正在尝试对我的 DSAQueue 类进行单元测试,它显示我的 isEmpty()、isFull() 和 dequeue() 方法失败。 以下是我的 DSAQueue 代码。我认为我的 Dequ
我想尽量减少对传入请求的数据库查询。它目前需要写入 6 个不同的表。在返回响应之前不需要完成处理。因此,我考虑了 laravel 队列,但我想知道我是否也可以摆脱写入队列/作业表所需的单独查询。我可以
我正在学习队列数据结构。我想用链表创建队列。我想编程输出:10 20程序输出:队列为空-1 队列为空-1 我哪里出错了? 代码如下: class Node { int x; Node next
“当工作人员有空时,他们会根据主题的优先级列表从等待请求池中进行选择。在时间 t 到达的所有请求都可以在时间 t 进行分配。如果两名工作人员同时有空,则安排优先权分配给最近的工作最早安排的人。如果仍然
我正在开发一个巨大的应用程序,它使用一些子菜单、模式窗口、提示等。 现在,我想知道在此类应用程序中处理 Esc 和单击外部事件的正确方法。 $(document).keyup(function(e)
所以 如果我有一个队列 a --> b --> NULL; 当我使用函数时 void duplicate(QueueNodePtr pHead, QueueNodePtr *pTail) 它会给 a
我正在尝试为键盘输入实现 FIFO 队列,但似乎无法让它工作。我可以让键盘输入显示在液晶显示屏上,但这就是我能做的。我认为代码应该读取键盘输入并将其插入队列,然后弹出键盘输入并将值读取到液晶屏幕上。有
我是一名优秀的程序员,十分优秀!