- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们经常看到人们编写的数据流管道扩展性不好。这令人沮丧,因为Dataflow旨在透明地进行扩展,但是Dataflow管道中仍然存在一些反模式,因此很难进行扩展。有哪些常见的反模式和避免它们的技巧?
最佳答案
扩展数据流管道
嗨,鲁文·拉克斯(Reuven Lax)在这里。我是Dataflow工程团队的一员,负责领导流媒体运行器的设计和实施。在Dataflow之前,我领导了建立MillWheel的团队多年。 MillWheel在this VLDB 2013 paper中进行了描述,并且是Dataflow底层流技术的基础。
数据流通常不需要您过多考虑如何进行管道扩展。复杂的算法可以完成很多工作,这些算法可以自动并行化和调整跨多台机器的管道。但是,与任何此类系统一样,有些反模式可能会大规模地限制您的管道。在本文中,我们将介绍这些反模式中的三个,并讨论如何解决它们。假设您已经熟悉Dataflow编程模型。如果没有,我建议从我们的Getting Started guide和Tyler Akidau的Streaming 101和Streaming 102博客文章开始。您还可以阅读VLDB 2015中发布的Dataflow model paper。
今天,我们将讨论扩展您的管道,或更具体地说,为什么您的管道可能无法扩展。当我们说可伸缩性时,我们指的是管道随着输入大小的增加和密钥分布的变化而有效运行的能力。场景:您已经编写了一个很酷的新Dataflow管道,我们提供的高级操作易于编写。您已经使用DirectPipelineRunner
在计算机上本地测试了此管道,一切看起来都很好。您甚至尝试将其部署在少量的Compute VM上,但看起来仍然很乐观。然后,您尝试按比例放大到更大的数据量,图片肯定变差了。对于批处理管道,完成管道所需的时间远远超过预期。对于流水线管道,数据流UI中报告的滞后随着流水线越来越远而不断增加。我们将解释可能发生这种情况的一些原因,以及如何解决这些问题。
昂贵的每条记录操作
我们看到的一个常见问题是流水线会对每个处理的记录执行不必要的昂贵或缓慢的操作。从技术上讲,这不是一个硬扩展的瓶颈-有了足够的资源,Dataflow仍可以将此管道分配到足够的计算机上以使其正常运行。但是,当运行数百万或数十亿条记录时,这些每条记录操作的成本加起来意外地大。通常,这些问题在规模较小时根本不会引起注意。
这是一个这样的操作的示例,该操作取自真实的Dataflow管道。
import javax.json.Json;
...
PCollection<OutType> output = input.apply(ParDo.of(new DoFn<InType, OutType>() {
public void processElement(ProcessContext c) {
JsonReader reader = Json.createReader();
// Perform some processing on entry.
...
}
}));
java.util.zip.ZipFile.getEntry(ZipFile.java:308)
java.util.jar.JarFile.getEntry(JarFile.java:240)
java.util.jar.JarFile.getJarEntry(JarFile.java:223)
sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983)
sun.misc.URLClassPath$1.next(URLClassPath.java:240)
sun.misc.URLClassPath$1.hasMoreElements(URLClassPath.java:250)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:601)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:599)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader$3.next(URLClassLoader.java:598)
java.net.URLClassLoader$3.hasMoreElements(URLClassLoader.java:623)
sun.misc.CompoundEnumeration.next(CompoundEnumeration.java:45)
sun.misc.CompoundEnumeration.hasMoreElements(CompoundEnumeration.java:54)
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:354)
java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
javax.json.spi.JsonProvider.provider(JsonProvider.java:89)
javax.json.Json.createReader(Json.java:208)
<.....>.processElement(<filename>.java:174)
Json.createReader
的调用都在搜索类路径,以尝试找到已注册的
JsonProvider
。从堆栈跟踪中可以看到,这涉及加载和解压缩JAR文件。在大规模管道上按记录执行此操作的效果可能不太好!
JsonReaderFactory
,并使用该实例化单个阅读器对象。您可能会想在Dataflow的
JsonReaderFactory
方法中为每捆记录创建一个
startBundle
。但是,尽管这对于批处理管道来说效果很好,但是在流模式下,捆绑包可能很小-有时只有几条记录。因此,我们也不建议每个捆绑包都进行昂贵的工作。即使您认为您的管道将仅以批处理方式使用,您将来也可能希望将其作为流管道运行。因此,通过确保它们在任何一种模式下均能正常运行,可以确保您的管道永不过时!
GroupByKey
。
GroupByKey
允许将一组
PCollection
个键值对分组,以便将特定键的所有值分组在一起以作为一个单元进行处理。 Dataflow的大多数内置汇总转换-
Count
,
Top
,
Combine
等-在封面下使用
GroupByKey
。如果单个工作人员非常忙(例如,通过查看该工作的GCE工作组确定CPU使用率很高)而其他工作人员却处于空闲状态,那么您可能会遇到热键问题,但是管道会越来越落后。
DoFn
的结果的
GroupByKey
提供了
KV<KeyType, Iterable<ValueType>>
的输入类型。这意味着该键的所有值的整个集合(如果使用窗口化,则在当前窗口内)被建模为单个
Iterable
元素。特别是,这意味着该键的所有值必须在同一台计算机上,实际上是在同一线程上进行处理。热键的存在可能会导致性能问题-当一个或多个键接收数据的速度快于单个cpu上处理数据的速度时。例如,考虑以下代码片段
p.apply(Read.from(new UserWebEventSource())
.apply(new ExtractBrowserString())
.apply(Window.<Event>into(FixedWindow.of(1, Duration.standardSeconds(1))))
.apply(GroupByKey.<String, Event>create())
.apply(ParDo.of(new ProcessEventsByBrowser()));
ProcessEventsByBrowser
DoFn构造为组合器。组合器是一种特殊的用户功能,它允许对可迭代对象进行分段处理。例如,如果目标是计算每个浏览器每秒的事件数,则可以使用
Count.perKey()
代替
ParDo
。数据流能够将部分合并操作提升到
GroupByKey
之上,从而提供更多的并行性(对于那些来自数据库领域的人,这类似于将谓词下推);有些工作可以在前一个阶段完成,希望可以更好地分发。
Combine.globally()
,
Count.globally()
,
Top.largest()
等)时,您也可能会看到这种情况。在幕后,这些操作正在单个静态键上执行每个键的组合,如果此键的音量太大,则可能无法很好地执行。为了解决这个问题,我们允许您使用
Combine.PerKey.withHotKeyFanout
或
Combine.Globally.withFanout
提供额外的并行提示。这些操作将在您的管道中创建一个额外的步骤,以便在目标计算机上执行最终聚合之前,预聚合许多计算机上的数据。这些操作没有神奇的数字,但是一般的策略是将任何热键拆分为足够的子分片,以使任何单个分片都处于管道可以维持的按工作人员吞吐量的水平之下。
GroupByKey
)将对每个窗口执行单独的分组。与仅提供全局同步窗口的其他系统不同,Dataflow分别窗口化每个键的数据。这就是我们提供灵活的每键窗口(例如
sessions)的功能。有关更多信息,建议您阅读Dataflow文档中的
windowing guide。
pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(GroupByKey.<KeyType, ValueType>create())
.apply(ParDo.of(new DoFn<KV<KeyType, Iterable<ValueType>>, Long>() {
public void processElement(ProcessContext c) {
c.output(c.element().size());
}
}));
pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(Count.perKey());
关于google-cloud-dataflow - 如何确保我的数据流管道可扩展?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34956180/
我有一个 Cloud Run 服务,它通过 SQLAlchemy 访问 Cloud SQL 实例.但是,在 Cloud Run 的日志中,我看到 CloudSQL connection failed.
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 4年前关闭。 Improve t
在将 docker 容器镜像部署到 Cloud Run 时,我可以选择一个区域,这很好。 Cloud Run 将构建委托(delegate)给 Cloud Build,后者显然会创建两个存储桶来实现这
我正在尝试将 Cloud Functions 用作由 PubSub 触发的异步后台工作程序,并进行更长时间的工作(以分钟为单位)。完整代码在这里https://github.com/zdenulo/c
这是/etc/cloud/cloud.cfg的内容Ubuntu云16.04镜像: # The top level settings are used as module # and system co
如何从 Google Cloud Function 启动 Cloud Dataflow 作业?我想使用 Google Cloud Functions 作为启用跨服务组合的机制。 最佳答案 我已经包含了
我想使用 Cloud Shell 在我的第二代 Cloud Sql 实例上运行数据库迁移。 我找到了一个 example in the docs关于如何使用 gcloud 进行连接.但是当我运行命令时
我正在尝试使用 Google Cloud PubSub和我的 Google Cloud Dataproc群集,我收到如下身份验证范围错误: { "code" : 403, "errors" :
这是我的用例。 我已经有一个以私有(private)模式部署的 Cloud Run 服务。 (与云功能相同的问题) 我正在开发使用此 Cloud Run 的新服务。我在应用程序中使用默认凭据进行身份验
如何连接到 Cloud SQL 上的数据库,而无需在容器中添加我的凭据文件? 最佳答案 使用 UNIX 域套接字 (Java) 从云运行(完全托管)连接到云 SQL At this time Clou
我有一个google-cloud-ml作业,需要从gs存储桶加载numpy .npz文件。我遵循了this example上关于如何从gs加载.npy文件的操作,但是由于.npz文件已压缩,因此它对我
我想创建链接到另一个项目中的 Cloud Source Repository 的 Cloud Build 触发器。但是当我在应该选择存储库的步骤中时,列表是空的。我尝试了不同的许可,但没有运气。谁能告
向 Twilio 发送 SMS 时,Twilio 会向指定的 URL 发送多个请求,以通过 Webhook 提供该 SMS 传送的状态。我想让这个回调异步,所以我开发了一个 Cloud Functio
我需要更改我的项目 ID,因为要验证的 Firebase 身份验证链接在链接上显示了项目 ID,并且由于品牌 reshape ,项目名称已更改。根据我发现的信息,更改项目 ID 似乎不太可能。我正在考
用于部署我的 Angular 应用程序的 CI/CD 管道已关闭,但我看到 Google Cloud Run 在容器镜像更新后没有部署新修订版。 我已将 Cloud Build 设置为在 GitHub
报价https://cloud.google.com/load-balancing/docs/https/setting-up-https-serverless#enabling While Goog
Cloud Spanner 提供了两种不同的 API。 Cloud Spanner 读取与 Cloud Spanner SQL API 之间有什么区别? 最佳答案 在幕后,它们都使用相同的执行机制,因
我是 GCP 堆栈的新手,所以我对用于存储数据的 GCP 技术数量感到非常困惑: https://cloud.google.com/products/storage 虽然上面的文章中没有提到googl
我发现 Google Cloud Functions 的网络出站费用令人惊讶,我正在尝试了解发生这种情况的原因以及如何避免这种情况。 Stackdriver 监控表明有问题的函数是我的 ingest
我使用 Prisma使用 Cloud Run 和 Cloud SQL。在向 prisma.schema 提供 DATABASE_URL 后,它会在运行时抛出一个错误。 Can't reach data
我是一名优秀的程序员,十分优秀!