gpt4 book ai didi

c# - 如何聚合来自异步生产者的数据并将其写入文件?

转载 作者:太空狗 更新时间:2023-10-29 20:33:42 26 4
gpt4 key购买 nike

我正在学习 C# 中的异步/等待模式。目前我正在尝试解决这样的问题:

  • 有一个生产者(硬件设备)每秒生成 1000 个数据包。我需要将这些数据记录到一个文件中。

  • 设备只有一个ReadAsync() 方法来一次报告一个数据包。

  • 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒一次。

  • 如果在下一批数据包准备好写入时写入过程没有及时完成,则写入操作应该失败。

到目前为止,我已经写了类似下面的内容。它有效,但我不确定这是否是解决问题的最佳方法。有什么意见或建议吗?解决此类生产者/消费者问题(消费者需要聚合从生产者那里收到的数据)的最佳做法是什么?

static async Task TestLogger(Device device, int seconds)
{
const int bufLength = 1000;
bool firstIteration = true;
Task writerTask = null;

using (var writer = new StreamWriter("test.log")))
{
do
{
var buffer = new byte[bufLength][];

for (int i = 0; i < bufLength; i++)
{
buffer[i] = await device.ReadAsync();
}

if (!firstIteration)
{
if (!writerTask.IsCompleted)
throw new Exception("Write Time Out!");
}

writerTask = Task.Run(() =>
{
foreach (var b in buffer)
writer.WriteLine(ToHexString(b));
});

firstIteration = false;
} while (--seconds > 0);
}
}

最佳答案

您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了 Stephen Cleary 的 AsyncProducerConsumerQueue<T> 特色 this question .

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
while (true)
{
var list = new List<byte>();
while (true)
{
token.ThrowIfCancellationRequested(token);
var packet = await _device.ReadAsync(token);
list.Add(packet);
if (list.Count == 1000)
break;
}
// push next batch
await _queue.EnqueueAsync(list.ToArray(), token);
}
}

// consumer
async Task LogAsync(CancellationToken token)
{
Task previousFlush = Task.FromResult(0);
CancellationTokenSource cts = null;
while (true)
{
token.ThrowIfCancellationRequested(token);
// get next batch
var nextBatch = await _queue.DequeueAsync(token);
if (!previousFlush.IsCompleted)
{
cts.Cancel(); // cancel the previous flush if not ready
throw new Exception("failed to flush on time.");
}
await previousFlush; // it's completed, observe for any errors
// start flushing
cts = CancellationTokenSource.CreateLinkedTokenSource(token);
previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
}
}

如果您不想让记录器失败,而是更愿意取消刷新并继续下一批,您可以通过对此代码进行最小的更改来实现。

回应@l3arnon 的评论:

  1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.
  1. “一个数据包不是一个字节,它是字节[]”——一个数据包一个字节,这从OP的代码中显而易见:buffer[i] = await device.ReadAsync() .那么,一批数据包就是byte[] .
  2. “您还没有使用 OP 的 ToHexString。” - 目标是展示如何使用 Stream.WriteAsync native 接受取消 token ,而不是 WriteLineAsync不允许取消。使用 ToHexString 很简单与 Stream.WriteAsync并且仍然利用取消支持:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
    Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
  3. “AsyncProducerConsumerQueue 的健壮性和测试远不如 .Net 的 TPL 数据流”- 我认为这不是一个确定的事实。但是,如果 OP 关心它,他可以使用常规 BlockingCollection ,它不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL 数据流版本带有一个 冗余 CPU 和锁定密集型操作:使用 logAction.Post(packet) 将数据从生产者管道移动到写入者管道, 逐字节。我的代码不这样做。

  4. “您在抛出异常后等待 previousFlush 以查找错误,这使得该行变得多余。” - 这条线不是多余的。也许,你错过了这一点:previousFlush.IsCompleted可以是true什么时候previousFlush.IsFaultedpreviousFlush.IsCancelled也是true .所以,await previousFlush与观察已完成任务的任何错误(例如,写入失败)相关,否则这些错误将丢失。

关于c# - 如何聚合来自异步生产者的数据并将其写入文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24055900/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com