- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Apache Beam 2.16.0 构建一个管道来处理大量 XML 文件。平均计数为每 24 小时 7000 万次,在峰值负载时可以达到 5 亿次。文件大小从 ~1 kb 到 200 kb 不等(有时可能更大,例如 30 mb)
文件经过各种转换,最终目的地是 BigQuery 表以供进一步分析。因此,首先我读取 xml 文件,然后反序列化为 POJO(在 Jackson 的帮助下),然后应用所有所需的转换。转换速度非常快,在我的机器上,我每秒可以进行大约 40000 次转换,具体取决于文件大小。
我主要关心的是文件读取速度。我感觉所有的阅读都是通过一名工作人员完成的,我不明白这是如何并行的。我在 10k 测试文件数据集上进行了测试。
我的本地计算机(MacBook pro 2018:ssd、16 GB RAM 和 6 核 i7 cpu)上的批处理作业可以解析大约 750 个文件/秒。如果我在 DataFlow 上使用 n1-standard-4 机器运行它,我只能获得大约 75 个文件/秒。它通常不会扩展,但即使扩展(有时最多 15 个工作人员),我每秒只能获得大约 350 个文件。
更有趣的是流工作。它立即从 6-7 个工作人员开始,在 UI 上我可以看到 1200-1500 个元素/秒,但通常它不会显示速度,如果我选择页面上的最后一项,它会显示它已经处理了 10000 个元素。
批处理作业和流作业之间的唯一区别是 FileIO 的此选项:
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
为什么这会对处理速度产生如此大的影响?
运行参数:
--runner=DataflowRunner
--project=<...>
--inputFilePattern=gs://java/log_entry/*.xml
--workerMachineType=n1-standard-4
--tempLocation=gs://java/temp
--maxNumWorkers=100
运行区域和存储桶区域相同。
管道:
pipeline.apply(
FileIO.match()
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
.filepattern(options.getInputFilePattern())
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply("xml to POJO", ParDo.of(new XmlToPojoDoFn()));
xml 文件示例:
<LogEntry><EntryId>0</EntryId>
<LogValue>Test</LogValue>
<LogTime>12-12-2019</LogTime>
<LogProperty>1</LogProperty>
<LogProperty>2</LogProperty>
<LogProperty>3</LogProperty>
<LogProperty>4</LogProperty>
<LogProperty>5</LogProperty>
</LogEntry>
现实生活中的文件和项目要复杂得多,有大量嵌套节点和大量转换规则。
GitHub 上的简化代码:https://github.com/costello-art/dataflow-file-io它只包含“瓶颈”部分 - 读取文件并反序列化为 POJO。
如果我可以在我的机器(这是一个功能强大的工作线程)上处理大约 750 个文件/秒,那么我预计在 Dataflow 中的类似 10 个工作线程上处理大约 7500 个文件/秒。
最佳答案
我试图编写一个具有某些功能的测试代码,以检查 FileIO.match
的行为和工作人员数量 [1]。
在此代码中,我将值 numWorkers 设置为 50,但您可以设置您需要的值。我可以看到 FileIO.match 方法将找到与这些模式匹配的所有链接,但之后,您必须单独处理每个文件的内容。
例如,在我的例子中,我创建了一个接收每个文件的方法,然后将内容除以“new_line (\n
)”字符(但在这里您可以根据需要处理它,它还取决于文件类型(csv、xml,...)。
因此,我将每一行转换为 TableRow
,BigQuery 可以理解的格式,并分别返回每个值 (out.output(tab)
),这样,Dataflow 将根据 pipeline 的工作量来处理不同 Worker 中的线路,例如 3 个不同 Worker 中的 3000 条线路,每个 1000 条线路。
最后,由于这是一个批处理过程,Dataflow 将等待处理所有行,然后将其插入 BigQuery。
我希望这个测试代码对您有所帮助。
关于java - Google Dataflow批处理文件处理性能不佳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59382913/
我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素
就 Google Cloud 上 Dataflow 的 HA 而言,最佳架构是什么?我的工作负载在两个区域运行。数据流从一个多区域存储桶中读取并将结果写出到另一个多区域存储桶中。 为了实现高可用性(以
如图 here数据流管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。 这是一些伪代码来说明我想要实现的内容:
在旧的定价页面上,他们提到 Cloud Dataflow 工作人员使用的所有 Google Compute 实例都是根据持续使用价格规则计费的,但新的定价页面不再提及。 我假设由于它在内部使用相同的
批处理 Dataflow 作业处理完所有数据后是否可以执行操作?具体来说,我想将管道刚刚处理的文本文件移动到不同的 GCS 存储桶。我不确定将它放在我的管道中的哪个位置以确保它在数据处理完成后执行一次
我希望能够通过自定义键使用分组,但这是我目前的尝试, 我们为 KV 对象的键使用自定义类,因为我们希望 GroupBy 具有更复杂的条件,而不是使用 String 等进行简单的键匹配。 ```
当尝试在 Dataflow 服务上运行管道时,我在命令行上指定了暂存和临时存储桶(在 GCS 中)。当程序执行时,我在管道运行之前收到一个 RuntimeException,根本原因是我在路径中遗漏了
我试图找到一种优雅地结束我的工作的方法,以免丢失任何数据,从 PubSub 流式传输并写入 BigQuery。 我可以设想的一种可能方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道
问题: 使用 Cloud Dataflow 时,我们会看到 2 个指标(请参阅 this page): 系统延迟 数据新鲜度 这些在 Stackdriver 中也可用以下名称(摘自 here): sy
我一直在阅读 Dataflow SDK 文档,试图找出当数据到达流作业中的水印时会发生什么。 这一页: https://cloud.google.com/dataflow/model/windowin
有没有办法(或任何类型的黑客)从压缩文件中读取输入数据? 我的输入包含数百个文件,这些文件是用 gzip 压缩生成的,解压缩它们有些乏味。 最佳答案 Dataflow 现在支持从压缩文本源中读取(从
我正在尝试在 Dataflow 中执行联合操作。是否有用于在 Dataflow 中合并两个 PCollections 的示例代码? 最佳答案 一个简单的方法是像这样将 Flatten() 与 Remo
在我的管道上运行“更新”后,我注意到有新创建的永久磁盘在 10 多分钟后未附加到任何实例。 最佳答案 这是 Dataflow 服务的一个持续已知问题,会在管道更新过程中导致孤立磁盘。可以安全地删除这些
是否可以为 Dataflow 工作人员提供自定义包? 我想从计算内部输出到 Debian 打包的二进制文件。 编辑:需要明确的是,包配置非常复杂,仅将文件捆绑在 --filesToStage 中是不可
我想使用 Google Cloud Dataflow 创建 session 窗口,如 dataflow model paper 中所述。 .我想将我的未绑定(bind)数据发送到 Pub/Sub,然后
我正在尝试运行从 pubsub 主题读取并写入 bigquery 的管道。时间戳是从主题消息中解析出来的。但是,我收到了一条关于允许的时间戳偏差的错误,并引用了下面复制的文档。 getAllowedT
我有一个大型数据文件 (1 TB) 的数据要导入 BigQuery。每行包含一个键。在导入数据并创建我的 PCollection 以导出到 BigQuery 时,我想确保我不会基于此键值导入重复记录。
我正在通过 Python API 在 Dataflow 上使用 Apache Beam 从 Bigquery 读取数据,对其进行处理,然后将其转储到 Datastore 接收器中。 不幸的是,作业经常
我一直在研究使用 spring-cloud-dataflow 中的 spring-cloud-task 构建的项目。查看示例项目和文档后,似乎表明任务是通过仪表板或 shell 手动启动的。 spri
我有以下场景: 管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。 您能否让我知道实现这一目标的最佳选择是什么? 管
我是一名优秀的程序员,十分优秀!