gpt4 book ai didi

c# - 解压缩 zip、解析文件并展平为 CSV

转载 作者:行者123 更新时间:2023-11-30 23:15:39 28 4
gpt4 key购买 nike

我正在尝试最大化以下任务的性能:

  • 枚举 zip 文件的目录
  • 在内存中提取 zip 以查找 .json 文件(处理嵌套的 zip)
  • 解析json文件
  • json 文件中的属性写入聚合的 .CSV 文件中

我想要的 TPL 布局是:

producer -> parser block -> batch block -> csv writer block

想法是单个生产者提取 zip 并找到 json 文件,将文本发送到并行运行的解析器 block (多消费者)。批处理 block 分组为 200 个批处理,写入器 block 在每次调用时将 200 行转储到 CSV 文件中。

问题:

  • jsonParseBlock TransformBlock 花费的时间越长,丢弃的消息就越多。我该如何防止这种情况?
  • 我怎样才能更好地利用 TPL 来最大化性能?

    class Item
    {
    public string ID { get; set; }
    public string Name { get; set; }
    }

    class Demo
    {
    const string OUT_FILE = @"c:\temp\tplflat.csv";
    const string DATA_DIR = @"c:\temp\tpldata";
    static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
    static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };

    public static void Run()
    {
    Console.WriteLine($"{Environment.ProcessorCount} processors available");
    _InitTest(); // reset csv file, generate test data if needed
    // start TPL stuff
    var sw = Stopwatch.StartNew();
    // transformer
    var jsonParseBlock = new TransformBlock<string, Item>(rawstr =>
    {
    var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr);
    System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost
    return item;
    }, parseOpts);

    // batch block
    var jsonBatchBlock = new BatchBlock<Item>(200);

    // writer block
    var flatWriterBlock = new ActionBlock<Item[]>(items =>
    {
    //Console.WriteLine($"writing {items.Length} to csv");
    StringBuilder sb = new StringBuilder();
    foreach (var item in items)
    {
    sb.AppendLine($"{item.ID},{item.Name}");
    }
    File.AppendAllText(OUT_FILE, sb.ToString());
    });

    jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
    jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // start doing the work
    var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock);
    crawlerTask.Wait();
    flatWriterBlock.Completion.Wait();
    Console.WriteLine($"ALERT: tplflat.csv row count should match the test data");
    Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs");
    }

    static async Task GetJsons(string filepath, ITargetBlock<string> queue)
    {
    int count = 1;
    foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip"))
    {
    Console.WriteLine($"working on zip #{count++}");
    var zipStream = new FileStream(zip, FileMode.Open);
    await ExtractJsonsInMemory(zip, zipStream, queue);
    }
    queue.Complete();
    }

    static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
    {
    ZipArchive archive = new ZipArchive(stream);
    foreach (ZipArchiveEntry entry in archive.Entries)
    {
    if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
    {
    using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8))
    {
    var jsonText = reader.ReadToEnd();
    await queue.SendAsync(jsonText);
    }
    }
    else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
    {
    await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
    }
    }
    }
    }

更新1

我添加了 async,但我不清楚如何等待所有数据流 block 完成(c#、async 和 tpl 的新功能)。我基本上想说,“继续运行直到所有队列/ block 都为空”。我添加了以下“等待”代码,并且似乎可以正常工作。

// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();

最佳答案

简而言之,您的发布并忽略了返回值。您有两个选择:添加一个未绑定(bind)的 BufferBlock 来保存所有传入的数据,或者在 SendAsync 上添加 await,这将防止任何消息被发送掉线了。

static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
var archive = new ZipArchive(stream);
foreach (ZipArchiveEntry entry in archive.Entries)
{
if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
{
using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
{
var jsonText = reader.ReadToEnd();
await queue.SendAsync(jsonText);
}
}
else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
{
await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
}
}
}

您需要将异步一直拉回来,但这应该让您开始。

关于c# - 解压缩 zip、解析文件并展平为 CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42393721/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com