gpt4 book ai didi

google-cloud-dataflow - 如何确保我的数据流管道可扩展?

转载 作者:行者123 更新时间:2023-12-04 09:06:13 24 4
gpt4 key购买 nike

我们经常看到人们编写的数据流管道扩展性不好。这令人沮丧,因为Dataflow旨在透明地进行扩展,但是Dataflow管道中仍然存在一些反模式,因此很难进行扩展。有哪些常见的反模式和避免它们的技巧?

最佳答案

扩展数据流管道

嗨,鲁文·拉克斯(Reuven Lax)在这里。我是Dataflow工程团队的一员,负责领导流媒体运行器的设计和实施。在Dataflow之前,我领导了建立MillWheel的团队多年。 MillWheel在this VLDB 2013 paper中进行了描述,并且是Dataflow底层流技术的基础。

数据流通常不需要您过多考虑如何进行管道扩展。复杂的算法可以完成很多工作,这些算法可以自动并行化和调整跨多台机器的管道。但是,与任何此类系统一样,有些反模式可能会大规模地限制您的管道。在本文中,我们将介绍这些反模式中的三个,并讨论如何解决它们。假设您已经熟悉Dataflow编程模型。如果没有,我建议从我们的Getting Started guide和Tyler Akidau的Streaming 101Streaming 102博客文章开始。您还可以阅读VLDB 2015中发布的Dataflow model paper

今天,我们将讨论扩展您的管道,或更具体地说,为什么您的管道可能无法扩展。当我们说可伸缩性时,我们指的是管道随着输入大小的增加和密钥分布的变化而有效运行的能力。场景:您已经编写了一个很酷的新Dataflow管道,我们提供的高级操作易于编写。您已经使用DirectPipelineRunner在计算机上本地测试了此管道,一切看起来都很好。您甚至尝试将其部署在少量的Compute VM上,但看起来仍然很乐观。然后,您尝试按比例放大到更大的数据量,图片肯定变差了。对于批处理管道,完成管道所需的时间远远超过预期。对于流水线管道,数据流UI中报告的滞后随着流水线越来越远而不断增加。我们将解释可能发生这种情况的一些原因,以及如何解决这些问题。

昂贵的每条记录操作

我们看到的一个常见问题是流水线会对每个处理的记录执行不必要的昂贵或缓慢的操作。从技术上讲,这不是一个硬扩展的瓶颈-有了足够的资源,Dataflow仍可以将此管道分配到足够的计算机上以使其正常运行。但是,当运行数百万或数十亿条记录时,这些每条记录操作的成本加起来意外地大。通常,这些问题在规模较小时根本不会引起注意。

这是一个这样的操作的示例,该操作取自真实的Dataflow管道。

import javax.json.Json;
...
PCollection<OutType> output = input.apply(ParDo.of(new DoFn<InType, OutType>() {
public void processElement(ProcessContext c) {
JsonReader reader = Json.createReader();
// Perform some processing on entry.
...
}
}));


乍一看,这段代码并没有什么错,但是当大规模运行时,该管道运行得非常慢。

由于代码的实际业务逻辑不应该导致速度下降,因此我们怀疑某些事情会增加流水线的每条记录开销。为了获得更多关于此的信息,我们必须ssh到VM才能从worker中获取实际的线程配置文件。经过一些挖掘,我们发现线程通常卡在以下堆栈跟踪中:

java.util.zip.ZipFile.getEntry(ZipFile.java:308)
java.util.jar.JarFile.getEntry(JarFile.java:240)
java.util.jar.JarFile.getJarEntry(JarFile.java:223)
sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983)
sun.misc.URLClassPath$1.next(URLClassPath.java:240)
sun.misc.URLClassPath$1.hasMoreElements(URLClassPath.java:250)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:601)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:599)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader$3.next(URLClassLoader.java:598)
java.net.URLClassLoader$3.hasMoreElements(URLClassLoader.java:623)
sun.misc.CompoundEnumeration.next(CompoundEnumeration.java:45)
sun.misc.CompoundEnumeration.hasMoreElements(CompoundEnumeration.java:54)
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:354)
java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
javax.json.spi.JsonProvider.provider(JsonProvider.java:89)
javax.json.Json.createReader(Json.java:208)
<.....>.processElement(<filename>.java:174)


每次对 Json.createReader的调用都在搜索类路径,以尝试找到已注册的 JsonProvider。从堆栈跟踪中可以看到,这涉及加载和解压缩JAR文件。在大规模管道上按记录执行此操作的效果可能不太好!

此处的解决方案是为用户创建一个静态 JsonReaderFactory,并使用该实例化单个阅读器对象。您可能会想在Dataflow的 JsonReaderFactory方法中为每捆记录创建一个 startBundle。但是,尽管这对于批处理管道来说效果很好,但是在流模式下,捆绑包可能很小-有时只有几条记录。因此,我们也不建议每个捆绑包都进行昂贵的工作。即使您认为您的管道将仅以批处理方式使用,您将来也可能希望将其作为流管道运行。因此,通过确保它们在任何一种模式下均能正常运行,可以确保您的管道永不过时!

热键

数据流中的基本原语是 GroupByKeyGroupByKey允许将一组 PCollection个键值对分组,以便将特定键的所有值分组在一起以作为一个单元进行处理。 Dataflow的大多数内置汇总转换- CountTopCombine等-在封面下使用 GroupByKey。如果单个工作人员非常忙(例如,通过查看该工作的GCE工作组确定CPU使用率很高)而其他工作人员却处于空闲状态,那么您可能会遇到热键问题,但是管道会越来越落后。

为处理 DoFn的结果的 GroupByKey提供了 KV<KeyType, Iterable<ValueType>>的输入类型。这意味着该键的所有值的整个集合(如果使用窗口化,则在当前窗口内)被建模为单个 Iterable元素。特别是,这意味着该键的所有值必须在同一台计算机上,实际上是在同一线程上进行处理。热键的存在可能会导致性能问题-当一个或多个键接收数据的速度快于单个cpu上处理数据的速度时。例如,考虑以下代码片段

p.apply(Read.from(new UserWebEventSource())
.apply(new ExtractBrowserString())
.apply(Window.<Event>into(FixedWindow.of(1, Duration.standardSeconds(1))))
.apply(GroupByKey.<String, Event>create())
.apply(ParDo.of(new ProcessEventsByBrowser()));


此代码通过用户的网络浏览器键入所有用户事件,然后将每个浏览器的所有事件作为一个单元进行处理。但是,只有少数非常流行的浏览器(例如Chrome,IE,Firefox,Safari),并且这些键非常热-可能太热而无法在一个CPU上处理。除了性能之外,这也是可伸缩性的瓶颈。如果有四个热键,则在管道中添加更多工作线程将无济于事,因为这些键最多可以处理四个工作线程。您已经对管道进行了结构设计,以使Dataflow不会在不违反API合同的情况下进行扩展。

缓解这种情况的一种方法是将 ProcessEventsByBrowser DoFn构造为组合器。组合器是一种特殊的用户功能,它允许对可迭代对象进行分段处理。例如,如果目标是计算每个浏览器每秒的事件数,则可以使用 Count.perKey()代替 ParDo。数据流能够将部分合并操作提升到 GroupByKey之上,从而提供更多的并行性(对于那些来自数据库领域的人,这类似于将谓词下推);有些工作可以在前一个阶段完成,希望可以更好地分发。

不幸的是,尽管使用组合器通常会有所帮助,但这可能还不够-尤其是当热键非常热时;对于流管道尤其如此。当使用组合的全局变体( Combine.globally()Count.globally()Top.largest()等)时,您也可能会看到这种情况。在幕后,这些操作正在单个静态键上执行每个键的组合,如果此键的音量太大,则可能无法很好地执行。为了解决这个问题,我们允许您使用 Combine.PerKey.withHotKeyFanoutCombine.Globally.withFanout提供额外的并行提示。这些操作将在您的管道中创建一个额外的步骤,以便在目标计算机上执行最终聚合之前,预聚合许多计算机上的数据。这些操作没有神奇的数字,但是一般的策略是将任何热键拆分为足够的子分片,以使任何单个分片都处于管道可以维持的按工作人员吞吐量的水平之下。

大窗户

Dataflow提供了一种复杂的窗口化工具,可以根据时间存储数据。这在处理无边界数据时在流式管道中最有用,但是,批处理,有边界管道也完全支持它。当窗口策略已附加到PCollection时,任何后续的分组操作(最著名的 GroupByKey)将对每个窗口执行单独的分组。与仅提供全局同步窗口的其他系统不同,Dataflow分别窗口化每个键的数据。这就是我们提供灵活的每键窗口(例如 sessions)的功能。有关更多信息,建议您阅读Dataflow文档中的 windowing guide

由于窗口是每个键的事实,Dataflow在等待每个窗口关闭时在接收方缓冲元素。如果使用很长的窗户-例如一个24小时固定的窗口-这意味着必须缓冲大量数据,这可能是管道的性能瓶颈。这可能表现为缓慢(如热键),甚至表现为工作程序的内存不足错误(在日志中可见)。我们再次建议使用组合器以减小数据大小。编写此代码之间的区别:

pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(GroupByKey.<KeyType, ValueType>create())
.apply(ParDo.of(new DoFn<KV<KeyType, Iterable<ValueType>>, Long>() {
public void processElement(ProcessContext c) {
c.output(c.element().size());
}
}));


… 还有这个 ...

pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(Count.perKey());


……不只是简洁在后一个代码段中,Dataflow知道正在应用计数组合器,因此无论窗口有多长,都只需要存储每个键到目前为止的计数。相比之下,即使两个代码段在逻辑上是等效的,Dataflow对第一个代码段的了解也较少,并且被迫在接收器上缓冲一整天的数据!

如果无法将您的操作表示为组合器,则建议您查看 triggers API。这将允许您在关闭窗口之前乐观地处理窗口的各个部分,从而减小缓冲数据的大小。

请注意,这些限制中的许多不适用于批处理运行器。但是,如上所述,将来最好对管道进行验证,并确保它在两种模式下都能正常运行。

我们已经讨论了热键,大窗口和昂贵的每次记录操作。其他指南可以在我们的 documentation中找到。尽管本文着重介绍了在扩展管道时可能遇到的挑战,但是Dataflow的许多好处在很大程度上是透明的-例如动态工作重新平衡以最大程度地减少散乱的影响,基于吞吐量的自动缩放以及作业资源管理等可以适应多种不同的情况管道和数据形状,无需用户干预。我们一直在努力使我们的系统更具适应性,并计划随着时间的流逝将上述一些策略自动纳入核心执行引擎。感谢您的阅读,并祝您数据流愉快!

关于google-cloud-dataflow - 如何确保我的数据流管道可扩展?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34956180/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com