- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 Apache Beam 的新手,我的 Java 技能很低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。
我要执行的操作如下:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):NUMBER,DOMAIN
(例如 1,google.com
), 我想“剥离”第一个(数字)字段并只获取域部分。此管道的代码如下:
package misc.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
domains.inc();
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("domains"));
p.run().waitUntilFinish();
}
}
当我使用 Maven 执行此代码时,在我的笔记本电脑上需要四分多钟才能成功:
$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------
虽然简单的 cut(1)
在您眨眼之前就可以工作:
$time cut -d, -f2 top-1m.csv > domains
real 0m0.171s
user 0m0.140s
sys 0m0.028s
那么,这样的 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时效果更好),还是我的代码效率低下?
01-07-2014 更新:
作为肯·诺尔斯 suggested ,我尝试在 DirectRunner
之外的其他运行器上运行管道 — 在 DataflowRunner
上。所以更新后的代码如下所示:
package misc.examples;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-gcp-project-id");
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));
p.run().waitUntilFinish();
}
}
与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 — 3 分钟多一点:
最佳答案
Apache Beam 在 Apache Flink、Apache Spark、Apache Apex 和 Google Cloud Dataflow 等大规模数据处理引擎上提供正确的事件时间处理和可移植性。
在这里,您似乎在默认的 DirectRunner
中运行您的管道,这是一种小规模测试管道正确性的方法(其中“小”意味着不使用多台机器)。为了测试正确性,运行器还会执行额外的任务来帮助确保正确性,例如检查您的序列化 (Coder
) 以及以随机顺序放置元素以确保您的管道不依赖于顺序。
DirectRunner
不一定必须一次将所有值都存入内存,但它有一个流式执行模型,因此它也适用于无限数据集和触发。与简单循环相比,这也会增加开销。
也就是说,四分钟很慢,我提交了 BEAM-2516跟进。
您也可以尝试在其他后端上运行它,特别是 SparkRunner
、FlinkRunner
和 ApexRunner
支持在您的笔记本电脑上嵌入执行.
对 2017-07-01 更新的回应:
虽然您在 Cloud Dataflow 上的总运行时间约为 3 分钟,但处理数据的实际时间约为 1 分钟。您可以在日志中看到这一点。其余的是启动和关闭工作虚拟机。我们一直在努力减少这种开销。为什么需要大约 1 分钟?您必须分析才能找出答案(我很想听听结果!)但 Dataflow 肯定做的不仅仅是cut
:从 GCS 读取和写入,提供持久性和容错性,并且在 TextIO
写入步骤中,它正在执行数据的网络随机播放,以便并行写入分片文件。如果 Dataflow 注意到您的计算没有并行性并且足够小以至于不需要并行性,那么显然可以优化一些事情。
但请记住,Beam 和 Cloud Dataflow 的存在是为了帮助您对无法在单台机器上及时处理的大量数据使用并行处理。因此,处理没有可用并行性的微小示例不是目标。
次要顺序计算通常作为大型管道的一小部分发生,但在实际物理计划的背景下,小型辅助计算通常不会影响端到端时间。 VM 管理的开销也是一次性成本,因此它们更有可能根据数十到数百台机器上的数分钟到数小时的计算来衡量。
关于java - 简单的 Apache Beam 操作工作起来非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44736414/
我正在努力做到这一点 在我的操作中从数据库获取对象列表(确定) 在 JSP 上打印(确定) 此列表作为 JSP 中的可编辑表出现。我想修改然后将其提交回同一操作以将其保存在我的数据库中(失败。当我使用
我有以下形式的 Linq to Entities 查询: var x = from a in SomeData where ... some conditions ... select
我有以下查询。 var query = Repository.Query() .Where(p => !p.IsDeleted && p.Article.ArticleSections.Cou
我正在编写一个应用程序包,其中包含一个主类,其中主方法与GUI类分开,GUI类包含一个带有jtabbedpane的jframe,它有两个选项卡,第一个选项卡包含一个jtable,称为jtable1,第
以下代码产生错误 The nested query is not supported. Operation1='Case' Operation2='Collect' 问题是我做错了什么?我该如何解决?
我已经为 HA redis 集群(2 个副本、1 个主节点、3 个哨兵)设置了本地 docker 环境。只有哨兵暴露端口(10021、10022、10023)。 我使用的是 stackexchange
我正在 Desk.com 中构建一个“集成 URL”,它使用 Shopify Liquid 模板过滤器语法。对于开始日期为 7 天前而结束日期为现在的查询,此 URL 需要包含“开始日期”和“结束日期
你一定想过。然而情况却不理想,python中只能使用类似于 i++/i--等操作。 python中的自增操作 下面代码几乎是所有程序员在python中进行自增(减)操作的常用
我需要在每个使用 github 操作的手动构建中显示分支。例如:https://gyazo.com/2131bf83b0df1e2157480e5be842d4fb 我应该显示分支而不是一个。 最佳答
我有一个关于 Perl qr 运算符的问题: #!/usr/bin/perl -w &mysplit("a:b:c", /:/); sub mysplit { my($str, $patt
我已经使用 ArgoUML 创建了一个 ERD(实体关系图),我希望在一个类中创建两个操作,它们都具有 void 返回类型。但是,我只能创建一个返回 void 类型的操作。 例如: 我能够将 book
Github 操作仍处于测试阶段并且很新,但我希望有人可以提供帮助。我认为可以在主分支和拉取请求上运行 github 操作,如下所示: on: pull_request push: b
我正在尝试创建一个 Twilio 工作流来调用电话并记录用户所说的内容。为此,我正在使用 Record,但我不确定要在 action 参数中放置什么。 尽管我知道 Twilio 会发送有关调用该 UR
我不确定这是否可行,但值得一试。我正在使用模板缓冲区来减少使用此算法的延迟渲染器中光体积的过度绘制(当相机位于体积之外时): 使用廉价的着色器,将深度测试设置为 LEQUAL 绘制背面,将它们标记在模
有没有聪明的方法来复制 和 重命名 文件通过 GitHub 操作? 我想将一些自述文件复制到 /docs文件夹(:= 同一个 repo,不是远程的!),它们将根据它们的 frontmatter 重命名
我有一个 .csv 文件,其中第一列包含用户名。它们采用 FirstName LastName 的形式。我想获取 FirstName 并将 LastName 的第一个字符添加到它上面,然后删除空格。然
Sitecore 根据 Sitecore 树中定义的项目名称生成 URL, http://samplewebsite/Pages/Sample Page 但我们的客户有兴趣降低所有 URL(页面/示例
我正在尝试进行一些计算,但是一旦我输入金额,它就会完成。我只是希望通过单击按钮而不是自动发生这种情况。 到目前为止我做了什么: Angular JS - programming-fr
我的公司创建了一种在环境之间移动文件的复杂方法,现在我们希望将某些构建的 JS 文件(已转换和缩小)从一个 github 存储库移动到另一个。使用 github 操作可以实现这一点吗? 最佳答案 最简
在我的代码中,我创建了一个 JSONArray 对象。并向 JSONArray 对象添加了两个 JSONObject。我使用的是 json-simple-1.1.jar。我的代码是 package j
我是一名优秀的程序员,十分优秀!