gpt4 book ai didi

c# - 如何并行读取队列中的消息?

转载 作者:行者123 更新时间:2023-11-30 20:21:14 26 4
gpt4 key购买 nike

情况

我们有一个消息队列。我们希望并行处理消息并限制同时处理的消息数。

我们下面的试验代码确实会并行处理消息,但它只会在前一个处理完成后才开始新的一批处理。我们希望在任务完成时重新启动任务。

换句话说:只要消息队列不为空,最大数量的任务就应该始终处于事件状态。

试用代码

static string queue = @".\Private$\concurrenttest";

private static void Process(CancellationToken token)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
IEnumerable<Task> consumerTasks = ConsumerTasks();
await Task.WhenAll(consumerTasks);

await PeekAsync(new MessageQueue(queue));
}
});
}

private static IEnumerable<Task> ConsumerTasks()
{
for (int i = 0; i < 15; i++)
{
Command1 message;
try
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
Message msg = msMq.Receive();
message = (Command1)msg.Body;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
yield break; // nothing in queue
else throw;
}
yield return Task.Run(() =>
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
});
}
}

private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}

最佳答案

编辑

我花了很多时间考虑泵的可靠性 - 特别是如果从 MessageQueue 接收到消息,取消就变得很棘手 - 所以我提供了两种终止队列的方法:

  • CancellationToken 发出信号会尽快停止管道,这可能会导致消息丢失。
  • 调用 MessagePump.Stop() 终止泵,但允许在 MessagePump.Completion 任务转换到RanToCompletion

该解决方案使用 TPL 数据流(NuGet:Microsoft.Tpl.Dataflow)。

完整实现:​​

using System;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace StackOverflow.Q34437298
{
/// <summary>
/// Pumps the message queue and processes messages in parallel.
/// </summary>
public sealed class MessagePump
{
/// <summary>
/// Creates a <see cref="MessagePump"/> and immediately starts pumping.
/// </summary>
public static MessagePump Run(
MessageQueue messageQueue,
Func<Message, Task> processMessage,
int maxDegreeOfParallelism,
CancellationToken ct = default(CancellationToken))
{
if (messageQueue == null) throw new ArgumentNullException(nameof(messageQueue));
if (processMessage == null) throw new ArgumentNullException(nameof(processMessage));
if (maxDegreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));

ct.ThrowIfCancellationRequested();

return new MessagePump(messageQueue, processMessage, maxDegreeOfParallelism, ct);
}

private readonly TaskCompletionSource<bool> _stop = new TaskCompletionSource<bool>();

/// <summary>
/// <see cref="Task"/> which completes when this instance
/// stops due to a <see cref="Stop"/> or cancellation request.
/// </summary>
public Task Completion { get; }

/// <summary>
/// Maximum number of parallel message processors.
/// </summary>
public int MaxDegreeOfParallelism { get; }

/// <summary>
/// <see cref="MessageQueue"/> that is pumped by this instance.
/// </summary>
public MessageQueue MessageQueue { get; }

/// <summary>
/// Creates a new <see cref="MessagePump"/> instance.
/// </summary>
private MessagePump(MessageQueue messageQueue, Func<Message, Task> processMessage, int maxDegreeOfParallelism, CancellationToken ct)
{
MessageQueue = messageQueue;
MaxDegreeOfParallelism = maxDegreeOfParallelism;

// Kick off the loop.
Completion = RunAsync(processMessage, ct);
}

/// <summary>
/// Soft-terminates the pump so that no more messages will be pumped.
/// Any messages already removed from the message queue will be
/// processed before this instance fully completes.
/// </summary>
public void Stop()
{
// Multiple calls to Stop are fine.
_stop.TrySetResult(true);
}

/// <summary>
/// Pump implementation.
/// </summary>
private async Task RunAsync(Func<Message, Task> processMessage, CancellationToken ct = default(CancellationToken))
{
using (CancellationTokenSource producerCTS = ct.CanBeCanceled
? CancellationTokenSource.CreateLinkedTokenSource(ct)
: new CancellationTokenSource())
{
// This CancellationToken will either be signaled
// externally, or if our consumer errors.
ct = producerCTS.Token;

// Handover between producer and consumer.
DataflowBlockOptions bufferOptions = new DataflowBlockOptions {
// There is no point in dequeuing more messages than we can process,
// so we'll throttle the producer by limiting the buffer capacity.
BoundedCapacity = MaxDegreeOfParallelism,
CancellationToken = ct
};

BufferBlock<Message> buffer = new BufferBlock<Message>(bufferOptions);

Task producer = Task.Run(async () =>
{
try
{
while (_stop.Task.Status != TaskStatus.RanToCompletion)
{
// This line and next line are the *only* two cancellation
// points which will not cause dropped messages.
ct.ThrowIfCancellationRequested();

Task<Message> peekTask = WithCancellation(PeekAsync(MessageQueue), ct);

if (await Task.WhenAny(peekTask, _stop.Task).ConfigureAwait(false) == _stop.Task)
{
// Stop was signaled before PeekAsync returned. Wind down the producer gracefully
// by breaking out and propagating completion to the consumer blocks.
break;
}

await peekTask.ConfigureAwait(false); // Observe Peek exceptions.

ct.ThrowIfCancellationRequested();

// Zero timeout means that we will error if someone else snatches the
// peeked message from the queue before we get to it (due to a race).
// I deemed this better than getting stuck waiting for a message which
// may never arrive, or, worse yet, let this ReceiveAsync run onobserved
// due to a cancellation (if we choose to abandon it like we do PeekAsync).
// You will have to restart the pump if this throws.
// Omit timeout if this behaviour is undesired.
Message message = await ReceiveAsync(MessageQueue, timeout: TimeSpan.Zero).ConfigureAwait(false);

await buffer.SendAsync(message, ct).ConfigureAwait(false);
}
}
finally
{
buffer.Complete();
}
},
ct);

// Wire up the parallel consumers.
ExecutionDataflowBlockOptions executionOptions = new ExecutionDataflowBlockOptions {
CancellationToken = ct,
MaxDegreeOfParallelism = MaxDegreeOfParallelism,
SingleProducerConstrained = true, // We don't require thread safety guarantees.
BoundedCapacity = MaxDegreeOfParallelism,
};

ActionBlock<Message> consumer = new ActionBlock<Message>(async message =>
{
ct.ThrowIfCancellationRequested();

await processMessage(message).ConfigureAwait(false);
},
executionOptions);

buffer.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });

if (await Task.WhenAny(producer, consumer.Completion).ConfigureAwait(false) == consumer.Completion)
{
// If we got here, consumer probably errored. Stop the producer
// before we throw so we don't go dequeuing more messages.
producerCTS.Cancel();
}

// Task.WhenAll checks faulted tasks before checking any
// canceled tasks, so if our consumer threw a legitimate
// execption, that's what will be rethrown, not the OCE.
await Task.WhenAll(producer, consumer.Completion).ConfigureAwait(false);
}
}

/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndPeek.
/// </summary>
private static Task<Message> PeekAsync(MessageQueue messageQueue)
{
return Task.Factory.FromAsync(messageQueue.BeginPeek(), messageQueue.EndPeek);
}

/// <summary>
/// APM -> TAP conversion for MessageQueue.Begin/EndReceive.
/// </summary>
private static Task<Message> ReceiveAsync(MessageQueue messageQueue, TimeSpan timeout)
{
return Task.Factory.FromAsync(messageQueue.BeginReceive(timeout), messageQueue.EndPeek);
}

/// <summary>
/// Allows abandoning tasks which do not natively
/// support cancellation. Use with caution.
/// </summary>
private static async Task<T> WithCancellation<T>(Task<T> task, CancellationToken ct)
{
ct.ThrowIfCancellationRequested();

TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

using (ct.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs, false))
{
if (task != await Task.WhenAny(task, tcs.Task).ConfigureAwait(false))
{
// Cancellation task completed first.
// We are abandoning the original task.
throw new OperationCanceledException(ct);
}
}

// Task completed: synchronously return result or propagate exceptions.
return await task.ConfigureAwait(false);
}
}
}

用法:

using (MessageQueue msMq = GetQueue())
{
MessagePump pump = MessagePump.Run(
msMq,
async message =>
{
await Task.Delay(50);
Console.WriteLine($"Finished processing message {message.Id}");
},
maxDegreeOfParallelism: 4
);

for (int i = 0; i < 100; i++)
{
msMq.Send(new Message());

Thread.Sleep(25);
}

pump.Stop();

await pump.Completion;
}

不整洁但功能齐全的单元测试:

https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da

原始答案

正如我在评论中提到的,.NET 中已建立生产者/消费者模式,其中之一是管道。在 Microsoft 自己的 Stephen Toub 的“并行编程模式”中可以找到一个很好的例子(全文在这里:https://www.microsoft.com/en-au/download/details.aspx?id=19222,第 55 页)。

这个想法很简单:生产者不断地将东西放入队列中,消费者将其拉出并处理(与生产者并行,也可能彼此并行)。

这是一个消息管道示例,其中消费者使用同步、阻塞方法在项目到达时对其进行处理(我已将消费者并行化以适合您的场景):

void MessageQueueWithBlockingCollection()
{
// If your processing is continuous and never stops throughout the lifetime of
// your application, you can ignore the fact that BlockingCollection is IDisposable.
using (BlockingCollection<Message> messages = new BlockingCollection<Message>())
{
Task producer = Task.Run(() =>
{
try
{
for (int i = 0; i < 10; i++)
{
// Hand over the message to the consumer.
messages.Add(new Message());

// Simulated arrival delay for the next message.
Thread.Sleep(10);
}
}
finally
{
// Notify consumer that there is no more data.
messages.CompleteAdding();
}
});

Task consumer = Task.Run(() =>
{
ParallelOptions options = new ParallelOptions {
MaxDegreeOfParallelism = 4
};

Parallel.ForEach(messages.GetConsumingEnumerable(), options, message => {
ProcessMessage(message);
});
});

Task.WaitAll(producer, consumer);
}
}

void ProcessMessage(Message message)
{
Thread.Sleep(40);
}

上面的代码在大约 130-140 毫秒内完成,这正是您对消费者并行化的预期。

现在,在您的场景中,您正在使用更适合 TPL 数据流的 Taskasync/await(Microsoft 官方支持的库专为并行和异步序列处理)。

这是一个小演示,展示了您将用于该作业的不同类型的 TPL 数据流处理 block :

async Task MessageQueueWithTPLDataflow()
{
// Set up our queue.
BufferBlock<Message> queue = new BufferBlock<Message>();

// Set up our processing stage (consumer).
ExecutionDataflowBlockOptions options = new ExecutionDataflowBlockOptions {
CancellationToken = CancellationToken.None, // Plug in your own in case you need to support cancellation.
MaxDegreeOfParallelism = 4
};

ActionBlock<Message> consumer = new ActionBlock<Message>(m => ProcessMessageAsync(m), options);

// Link the queue to the consumer.
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });

// Wire up our producer.
Task producer = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
queue.Post(new Message());

await Task.Delay(10).ConfigureAwait(false);
}
}
finally
{
// Signal to the consumer that there are no more items.
queue.Complete();
}
});

await consumer.Completion.ConfigureAwait(false);
}

Task ProcessMessageAsync(Message message)
{
return Task.Delay(40);
}

调整以上内容以使用您的 MessageQueue 并不难,您可以确信最终结果不会出现线程问题。如果我今天/明天有更多时间,我会这样做。

关于c# - 如何并行读取队列中的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34437298/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com