gpt4 book ai didi

c# - 多个消费者不丢失消息

转载 作者:行者123 更新时间:2023-12-02 18:10:22 25 4
gpt4 key购买 nike

我有大量的东西通过 redis pub/sub 传输,我需要将它分发到多个 websocket 连接,所以基本上无论何时来自 redis 的消息,都需要通过所有 websockets 连接分发。

我想要多个消费者。他们每个人都应该收到所有消息。

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();


var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i++);

await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});

var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});

var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});

cts.CancelAfter(TimeSpan.FromSeconds(5));

Console.ReadLine();

最佳答案

Channel<T>不能向多个消费者广播消息。每次从 channel 中读取消息时,消息都会被消费,其他消费者将无法获取它。如果你想向所有消费者广播所有消息,你必须创建一个专用的 Channel<T>每个消费者。

您可能会发现这个问题很有趣:Factory for IAsyncEnumerable or IAsyncEnumerator .它显示了为 IAsyncEnumerable<T> 实现源/ Controller 的各种方法。序列,包括 channel 和 Rx 主题。


更新:下面是一个演示,演示如何使用多个 channel ,以便将所有消息传播给所有消费者。

List<Channel<int>> channels = new();

async Task CreateConsumer(Func<Channel<int>, Task> body)
{
var channel = Channel.CreateUnbounded<int>();
lock (channels) channels.Add(channel);
try
{
await Task.Run(() => body(channel)).ConfigureAwait(false);
}
finally
{
lock (channels) channels.Remove(channel);
}
}

Task consumer1 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer one: {i}");
}
});

Task consumer2 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer two: {i}");
}
});

using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
int i = 0;
while (true)
{
i++;
lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
catch (OperationCanceledException) { break; }
}
});

try { producer.Wait(); } catch { }
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(producer, consumer1, consumer2);

Try it on Fiddle .

CreateConsumer是一个异步方法,负责创建 channel 并将其添加到列表中。它还负责在消费者完成时从列表中删除 channel 。这很重要,否则如果消费者失败,生产者将继续在死 channel 中推送消息,从而导致内存泄漏。

消费者的“主体”(每个消费者可能不同)作为异步 lambda 传递给 CreateConsumer方法。

在启动生产者之前,启动所有消费者并创建他们的 channel 非常重要。这就是为什么 CreateConsumer方法包裹在 Task.Run 中.这样CreateConsumer里面的代码直到第一个await在调用 CreateConsumer 的同一线程上同步运行.

每次访问带有 channel 的列表都受到 lock 的保护,因为多个线程可能会同时尝试读取/修改列表。

关于c# - 多个消费者不丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72479688/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com