gpt4 book ai didi

java - 简单的 Apache Beam 操作工作起来非常慢

转载 作者:行者123 更新时间:2023-11-30 07:53:27 25 4
gpt4 key购买 nike

我是 Apache Beam 的新手,我的 Java 技能很低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。

我要执行的操作如下:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):NUMBER,DOMAIN(例如 1,google.com), 我想“剥离”第一个(数字)字段并只获取域部分。此管道的代码如下:

package misc.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

static class ExtractDomainsFn extends DoFn<String, String> {
private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");

@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
domains.inc();

String domain = c.element().split(",")[1];
c.output(domain);
}
}
}

public static void main(String[] args) {
Pipeline p = Pipeline.create();

p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("domains"));

p.run().waitUntilFinish();
}
}

当我使用 Maven 执行此代码时,在我的笔记本电脑上需要四分多钟才能成功:

$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------

虽然简单的 cut(1) 在您眨眼之前就可以工作:

$time cut -d, -f2 top-1m.csv > domains

real 0m0.171s
user 0m0.140s
sys 0m0.028s

那么,这样的 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时效果更好),还是我的代码效率低下?

01-07-2014 更新:

作为肯·诺尔斯 suggested ,我尝试在 DirectRunner 之外的其他运行器上运行管道 — 在 DataflowRunner 上。所以更新后的代码如下所示:

package misc.examples;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

static class ExtractDomainsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}

public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-gcp-project-id");
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));

p.run().waitUntilFinish();
}
}

与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 — 3 分钟多一点:

Google Dataflow Job

Google Dataflow Job Logs

最佳答案

Apache Beam 在 Apache Flink、Apache Spark、Apache Apex 和 Google Cloud Dataflow 等大规模数据处理引擎上提供正确的事件时间处理和可移植性。

在这里,您似乎在默认的 DirectRunner 中运行您的管道,这是一种小规模测试管道正确性的方法(其中“小”意味着不使用多台机器)。为了测试正确性,运行器还会执行额外的任务来帮助确保正确性,例如检查您的序列化 (Coder) 以及以随机顺序放置元素以确保您的管道不依赖于顺序。

DirectRunner 不一定必须一次将所有值都存入内存,但它有一个流式执行模型,因此它也适用于无限数据集和触发。与简单循环相比,这也会增加开销。

也就是说,四分钟很慢,我提交了 BEAM-2516跟进。

您也可以尝试在其他后端上运行它,特别是 SparkRunnerFlinkRunnerApexRunner 支持在您的笔记本电脑上嵌入执行.

对 2017-07-01 更新的回应:

虽然您在 Cloud Dataflow 上的总运行时间约为 3 分钟,但处理数据的实际时间约为 1 分钟。您可以在日志中看到这一点。其余的是启动和关闭工作虚拟机。我们一直在努力减少这种开销。为什么需要大约 1 分钟?您必须分析才能找出答案(我很想听听结果!)但 Dataflow 肯定做的不仅仅是cut:从 GCS 读取和写入,提供持久性和容错性,并且在 TextIO 写入步骤中,它正在执行数据的网络随机播放,以便并行写入分片文件。如果 Dataflow 注意到您的计算没有并行性并且足够小以至于不需要并行性,那么显然可以优化一些事情。

但请记住,Beam 和 Cloud Dataflow 的存在是为了帮助您对无法在单台机器上及时处理的大量数据使用并行处理。因此,处理没有可用并行性的微小示例不是目标。

次要顺序计算通常作为大型管道的一小部分发生,但在实际物理计划的背景下,小型辅助计算通常不会影响端到端时间。 VM 管理的开销也是一次性成本,因此它们更有可能根据数十到数百台机器上的数分钟到数小时的计算来衡量。

关于java - 简单的 Apache Beam 操作工作起来非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44736414/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com