gpt4 book ai didi

java - Google Dataflow批处理文件处理性能不佳

转载 作者:行者123 更新时间:2023-12-01 19:19:01 26 4
gpt4 key购买 nike

我正在尝试使用 Apache Beam 2.16.0 构建一个管道来处理大量 XML 文件。平均计数为每 24 小时 7000 万次,在峰值负载时可以达到 5 亿次。文件大小从 ~1 kb 到 200 kb 不等(有时可能更大,例如 30 mb)

文件经过各种转换,最终目的地是 BigQuery 表以供进一步分析。因此,首先我读取 xml 文件,然后反序列化为 POJO(在 Jackson 的帮助下),然后应用所有所需的转换。转换速度非常快,在我的机器上,我每秒可以进行大约 40000 次转换,具体取决于文件大小。

我主要关心的是文件读取速度。我感觉所有的阅读都是通过一名工作人员完成的,我不明白这是如何并行的。我在 10k 测试文件数据集上进行了测试。

我的本​​地计算机(MacBook pro 2018:ssd、16 GB RAM 和 6 核 i7 cpu)上的批处理作业可以解析大约 750 个文件/秒。如果我在 DataFlow 上使用 n1-standard-4 机器运行它,我只能获得大约 75 个文件/秒。它通常不会扩展,但即使扩展(有时最多 15 个工作人员),我每秒只能获得大约 350 个文件。

更有趣的是流工作。它立即从 6-7 个工作人员开始,在 UI 上我可以看到 1200-1500 个元素/秒,但通常它不会显示速度,如果我选择页面上的最后一项,它会显示它已经处理了 10000 个元素。

批处理作业和流作业之间的唯一区别是 FileIO 的此选项:

.continuously(Duration.standardSeconds(10), Watch.Growth.never()))

为什么这会对处理速度产生如此大的影响?

运行参数:

--runner=DataflowRunner
--project=<...>
--inputFilePattern=gs://java/log_entry/*.xml
--workerMachineType=n1-standard-4
--tempLocation=gs://java/temp
--maxNumWorkers=100

运行区域和存储桶区域相同。

管道:

pipeline.apply(
FileIO.match()
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
.filepattern(options.getInputFilePattern())
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply("xml to POJO", ParDo.of(new XmlToPojoDoFn()));

xml 文件示例:

<LogEntry><EntryId>0</EntryId>
<LogValue>Test</LogValue>
<LogTime>12-12-2019</LogTime>
<LogProperty>1</LogProperty>
<LogProperty>2</LogProperty>
<LogProperty>3</LogProperty>
<LogProperty>4</LogProperty>
<LogProperty>5</LogProperty>
</LogEntry>

现实生活中的文件和项目要复杂得多,有大量嵌套节点和大量转换规则。

GitHub 上的简化代码:https://github.com/costello-art/dataflow-file-io它只包含“瓶颈”部分 - 读取文件并反序列化为 POJO。

如果我可以在我的机器(这是一个功能强大的工作线程)上处理大约 750 个文件/秒,那么我预计在 Dataflow 中的类似 10 个工作线程上处理大约 7500 个文件/秒。

最佳答案

我试图编写一个具有某些功能的测试代码,以检查 FileIO.match 的行为和工作人员数量 [1]。

在此代码中,我将值 numWorkers 设置为 50,但您可以设置您需要的值。我可以看到 FileIO.match 方法将找到与这些模式匹配的所有链接,但之后,您必须单独处理每个文件的内容。

例如,在我的例子中,我创建了一个接收每个文件的方法,然后将内容除以“new_line (\n)”字符(但在这里您可以根据需要处理它,它还取决于文件类型(csv、xml,...)。

因此,我将每一行转换为 TableRow,BigQuery 可以理解的格式,并分别返回每个值 (out.output(tab)),这样,Dataflow 将根据 pipeline 的工作量来处理不同 Worker 中的线路,例如 3 个不同 Worker 中的 3000 条线路,每个 1000 条线路。

最后,由于这是一个批处理过程,Dataflow 将等待处理所有行,然后将其插入 BigQuery。

我希望这个测试代码对您有所帮助。

[1] https://github.com/GonzaloPF/dataflow-pipeline/blob/master/java/randomDataToBQ/src/main/fromListFilestoBQ.java

关于java - Google Dataflow批处理文件处理性能不佳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59382913/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com