gpt4 book ai didi

c# - 引导多个生产者和消费者

转载 作者:行者123 更新时间:2023-12-02 18:32:31 24 4
gpt4 key购买 nike

我有以下代码:

var channel = Channel.CreateUnbounded<string>();

var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));

var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");

await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}));

await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());

await Task.WhenAll(consumers);

它可以正常工作,但是我希望它在生产的同时进行消费。然而

await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());

阻止消费者运行直到它完成,我想不出让他们都运行的方法?

最佳答案

代码有几个问题,包括忘记枚举 producerconsumers 枚举。 IEnumerable 是延迟求值的,因此在您使用例如 foreachToList 实际枚举它之前,不会生成任何内容。

如果使用得当,ContinueWith 也没有任何问题。它绝对比使用异常作为控制流更好、更便宜。

通过使用一些常见的信道编码模式,可以很多改进代码。

  1. 生产者拥有并封装 channel
  2. 生产者只公开读者

此外,ContinueWith 是发出 ChannelWriter 完成信号的极佳选择,因为我们根本不关心哪个线程会执行该操作.如果有的话,我们更愿意使用其中一个“工作”线程来避免线程切换。

假设生产者函数是:

async Task Produce(ChannelWriter<string> writer, int producerNumber)
{
return Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");

await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}
}

制作人

生产者可以是:

ChannelReader<string> ProduceData(int dop)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;

var tasks=Enumerable.Range(0,dop)
.Select(producerNumber => Produce(producerNumber))
.ToList();
_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));
.

return channel.Reader;
}

完成和错误传播

注意这一行:

_ =Task.WhenAll(tasks).ContinueWith(t=>writer.TryComplete(t.Exception));

这表示一旦生产者完成,编写者本身就应该完成,可能会出现任何异常。 continuation 在哪个线程上运行并不重要,因为它除了调用 TryComplete 之外什么都不做。

更重要的是,t=>writer.TryComplete(t.Exception) 将工作异常传播给下游消费者。否则消费者永远不会知道出了什么问题。如果您有一个数据库使用者,您会希望它在源中止时避免完成任何更改。

消费者

消费者方法可以是:

async Task Consume(ChannelReader<string> reader,int dop,CancellationToken token=default)
{
var tasks= Enumerable
.Range(1, dop)
.Select(consumerNumber =>
Task.Run(async () =>
{
await foreach(var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}));
await Task.WhenAll(tasks);
}

在这种情况下,await Task.WhenAll(tasks); 枚举工作任务,从而启动它们。

生成所有生成的消息不需要其他任何东西。当所有生产者完成后,Channel.Reader 就完成了。发生这种情况时,ReadAllAsync 将继续向消费者提供所有剩余消息并退出。

组成

结合这两种方法非常简单:

var reader=Produce(10);
await Consume(reader);

一般模式

这是使用 Channel 的管道阶段的一般模式 - 从 ChannelReader 读取输入,将其写入内部 Channel 并仅返回拥有的 Channel 的 Reader。通过这种方式,stage 拥有 channel ,这使得完成和错误处理变得容易很多:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;

var tasks=Enumerable.Range(0,dop)
.Select(async i=>Task.Run(async ()=>
{
await(var item in reader.ReadAllAsync(token))
{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
}
},token));
_ =Task.WhenAll(tasks)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}

这允许将多个“阶段”链接在一起以形成管道:

var finalReader=Producer(...)
.Crunch1()
.Crunch2(10)
.Crunch3();
await foreach(var result in finalReader.ReadAllAsync())
{
...
}

生产者和消费者方法可以用相同的方式编写,例如允许创建数据导入管道:

var importTask = ReadFiles<string>(somePath)
.ParseCsv<string,Record[]>(10)
.ImportToDb<Record>(connectionString);

await importTask;

使用ReadFiles

static ChannelReader<string> ReadFiles(string folder)
{
var channel=Channel.CreateUnbounded<string>();
var writer=channel.Writer;

var task=Task.Run(async ()=>{
foreach(var path in Directory.EnumerateFiles(folder,"*.csv"))
{
await writer.WriteAsync(path);
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}

.NET 6 Parallel.ForEachAsync 更新

现在 .NET 6 在生产中得到支持,可以使用 Parallel.ForEachAsync 将并发消费者简化为:

static ChannelReader<TOut> Crunch<Tin,TOut>(this ChannelReader<Tin>,
int dop=1,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<TOut>();
var writer=channel.Writer;

var dop=new ParallelOptions {
MaxDegreeOfParallelism = dop,
CancellationToken = token
};
var task=Parallel.ForEachAsync(
reader.ReadAllAsync(token),
dop,
async item =>{
try
{
...
await writer.WriteAsync(msg);
}
catch(Exception exc)
{
//Handle the exception and keep processing messages
}
});
task.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel.Reader;
}

关于c# - 引导多个生产者和消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69254023/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com