gpt4 book ai didi

java - 使用 Apache Beam 和 Dataflow 的数据存储性能不佳

转载 作者:搜寻专家 更新时间:2023-10-31 19:59:45 26 4
gpt4 key购买 nike

我在 Datastore 写入速度方面遇到了巨大的性能问题。大多数时候它保持在 100 个元素/秒以下。

当使用数据存储客户端 (com.google.cloud:google-cloud-datastore) 在我的本地计算机上对写入速度进行基准测试并并行运行批量写入时,我能够达到大约 2600 个元素/秒的速度.

我已经使用 Java API 设置了一个简单的 Apache Beam 管道。这是它的图表:

full-pipeline

这是在没有 Datastore 节点的情况下运行时的速度:

pipeline-without-datastore-write

这样会快很多。这一切都表明 DatastoreV1.Write 是该管道​​中的瓶颈 - 从没有写入节点的管道速度和 DatastoreV1.Write 的壁时间与其他节点的壁时间相比来判断。


我尝试解决这个问题的方法:

• 增加初始 worker 的数量(尝试了 1 和 10,没有明显差异)。 Datastore 在一段时间后(可能在前 2 个节点完成处理之后)将写入次数减少到 1。基于此,我怀疑 DatastoreIO.v1().write() 不会并行运行其工作程序。为什么呢?

pipeline-log-workers

• 确保一切都在同一位置运行:GCP 项目、数据流管道工作人员和元数据、存储 - 全部设置为 us-central。这是建议here

• 尝试使用不同的实体 key 生成策略(根据 this post )。目前使用这种方法:Key.Builder keyBuilder = DatastoreHelper.makeKey("someKind", UUID.randomUUID().toString());。我不太确定这会生成足够均匀分布的 key ,但我想即使不是这样,性能也不应该这么低?


请注意,如果没有解决方法,我无法使用提供的 Apache Beam 和 Google 库:由于依赖问题,我不得不强制将 google-api-client 版本设为 1.22.0 并将 Guava 设为 23.0(请参阅例如 https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/607 )。

查看 DatastoreV1.Write 节点日志:

datastore-log-write

它每大约 5 秒推送一次 500 个实体,速度不是很快。

总体而言,DatastoreIO.v1().write() 速度似乎很慢,而且它的 worker 没有并行运行。知道如何解决这个问题或可能是什么原因吗?

最佳答案

我不应该让这个问题无人回答。

在联系 GCP 支持人员后,我得到了一个建议,认为原因可能是 TextIO.Read 节点从压缩 (gzipped) 文件中读取。显然这是一个不可并行化的操作。事实上,在为源切换到未压缩文件后,性能得到了提高。

建议的另一个解决方案是在从源代码读取后运行管道的手动重新分区。这意味着向管道中的项目添加任意键,按任意键分组,然后删除任意键。它也有效。这种方法归结为这段代码:

管道代码:

pipeline.apply(TextIO.read().from("foo").withCompression(Compression.GZIP)  
.apply(ParDo.of(new PipelineRepartitioner.AddArbitraryKey<>()))
.apply(GroupByKey.create())
.apply(ParDo.of(new PipelineRepartitioner.RemoveArbitraryKey<>()))
/* further transforms */

辅助类:

public class PipelineRepartitioner<T> {
public static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}

public static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@ProcessElement
public void processElement(ProcessContext c) {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
}

我在 Apache Beam Jira 上看到了与该问题相关的工单,因此这可能会在未来得到解决。

关于java - 使用 Apache Beam 和 Dataflow 的数据存储性能不佳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48387097/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com