- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们经常看到人们编写的数据流管道扩展性不好。这令人沮丧,因为Dataflow旨在透明地进行扩展,但是Dataflow管道中仍然存在一些反模式,因此很难进行扩展。有哪些常见的反模式和避免它们的技巧?
最佳答案
扩展数据流管道
嗨,鲁文·拉克斯(Reuven Lax)在这里。我是Dataflow工程团队的一员,负责领导流媒体运行器的设计和实施。在Dataflow之前,我领导了建立MillWheel的团队多年。 MillWheel在this VLDB 2013 paper中进行了描述,并且是Dataflow底层流技术的基础。
数据流通常不需要您过多考虑如何进行管道扩展。复杂的算法可以完成很多工作,这些算法可以自动并行化和调整跨多台机器的管道。但是,与任何此类系统一样,有些反模式可能会大规模地限制您的管道。在本文中,我们将介绍这些反模式中的三个,并讨论如何解决它们。假设您已经熟悉Dataflow编程模型。如果没有,我建议从我们的Getting Started guide和Tyler Akidau的Streaming 101和Streaming 102博客文章开始。您还可以阅读VLDB 2015中发布的Dataflow model paper。
今天,我们将讨论扩展您的管道,或更具体地说,为什么您的管道可能无法扩展。当我们说可伸缩性时,我们指的是管道随着输入大小的增加和密钥分布的变化而有效运行的能力。场景:您已经编写了一个很酷的新Dataflow管道,我们提供的高级操作易于编写。您已经使用DirectPipelineRunner
在计算机上本地测试了此管道,一切看起来都很好。您甚至尝试将其部署在少量的Compute VM上,但看起来仍然很乐观。然后,您尝试按比例放大到更大的数据量,图片肯定变差了。对于批处理管道,完成管道所需的时间远远超过预期。对于流水线管道,数据流UI中报告的滞后随着流水线越来越远而不断增加。我们将解释可能发生这种情况的一些原因,以及如何解决这些问题。
昂贵的每条记录操作
我们看到的一个常见问题是流水线会对每个处理的记录执行不必要的昂贵或缓慢的操作。从技术上讲,这不是一个硬扩展的瓶颈-有了足够的资源,Dataflow仍可以将此管道分配到足够的计算机上以使其正常运行。但是,当运行数百万或数十亿条记录时,这些每条记录操作的成本加起来意外地大。通常,这些问题在规模较小时根本不会引起注意。
这是一个这样的操作的示例,该操作取自真实的Dataflow管道。
import javax.json.Json;
...
PCollection<OutType> output = input.apply(ParDo.of(new DoFn<InType, OutType>() {
public void processElement(ProcessContext c) {
JsonReader reader = Json.createReader();
// Perform some processing on entry.
...
}
}));
java.util.zip.ZipFile.getEntry(ZipFile.java:308)
java.util.jar.JarFile.getEntry(JarFile.java:240)
java.util.jar.JarFile.getJarEntry(JarFile.java:223)
sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983)
sun.misc.URLClassPath$1.next(URLClassPath.java:240)
sun.misc.URLClassPath$1.hasMoreElements(URLClassPath.java:250)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:601)
java.net.URLClassLoader$3$1.run(URLClassLoader.java:599)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader$3.next(URLClassLoader.java:598)
java.net.URLClassLoader$3.hasMoreElements(URLClassLoader.java:623)
sun.misc.CompoundEnumeration.next(CompoundEnumeration.java:45)
sun.misc.CompoundEnumeration.hasMoreElements(CompoundEnumeration.java:54)
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:354)
java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
javax.json.spi.JsonProvider.provider(JsonProvider.java:89)
javax.json.Json.createReader(Json.java:208)
<.....>.processElement(<filename>.java:174)
Json.createReader
的调用都在搜索类路径,以尝试找到已注册的
JsonProvider
。从堆栈跟踪中可以看到,这涉及加载和解压缩JAR文件。在大规模管道上按记录执行此操作的效果可能不太好!
JsonReaderFactory
,并使用该实例化单个阅读器对象。您可能会想在Dataflow的
JsonReaderFactory
方法中为每捆记录创建一个
startBundle
。但是,尽管这对于批处理管道来说效果很好,但是在流模式下,捆绑包可能很小-有时只有几条记录。因此,我们也不建议每个捆绑包都进行昂贵的工作。即使您认为您的管道将仅以批处理方式使用,您将来也可能希望将其作为流管道运行。因此,通过确保它们在任何一种模式下均能正常运行,可以确保您的管道永不过时!
GroupByKey
。
GroupByKey
允许将一组
PCollection
个键值对分组,以便将特定键的所有值分组在一起以作为一个单元进行处理。 Dataflow的大多数内置汇总转换-
Count
,
Top
,
Combine
等-在封面下使用
GroupByKey
。如果单个工作人员非常忙(例如,通过查看该工作的GCE工作组确定CPU使用率很高)而其他工作人员却处于空闲状态,那么您可能会遇到热键问题,但是管道会越来越落后。
DoFn
的结果的
GroupByKey
提供了
KV<KeyType, Iterable<ValueType>>
的输入类型。这意味着该键的所有值的整个集合(如果使用窗口化,则在当前窗口内)被建模为单个
Iterable
元素。特别是,这意味着该键的所有值必须在同一台计算机上,实际上是在同一线程上进行处理。热键的存在可能会导致性能问题-当一个或多个键接收数据的速度快于单个cpu上处理数据的速度时。例如,考虑以下代码片段
p.apply(Read.from(new UserWebEventSource())
.apply(new ExtractBrowserString())
.apply(Window.<Event>into(FixedWindow.of(1, Duration.standardSeconds(1))))
.apply(GroupByKey.<String, Event>create())
.apply(ParDo.of(new ProcessEventsByBrowser()));
ProcessEventsByBrowser
DoFn构造为组合器。组合器是一种特殊的用户功能,它允许对可迭代对象进行分段处理。例如,如果目标是计算每个浏览器每秒的事件数,则可以使用
Count.perKey()
代替
ParDo
。数据流能够将部分合并操作提升到
GroupByKey
之上,从而提供更多的并行性(对于那些来自数据库领域的人,这类似于将谓词下推);有些工作可以在前一个阶段完成,希望可以更好地分发。
Combine.globally()
,
Count.globally()
,
Top.largest()
等)时,您也可能会看到这种情况。在幕后,这些操作正在单个静态键上执行每个键的组合,如果此键的音量太大,则可能无法很好地执行。为了解决这个问题,我们允许您使用
Combine.PerKey.withHotKeyFanout
或
Combine.Globally.withFanout
提供额外的并行提示。这些操作将在您的管道中创建一个额外的步骤,以便在目标计算机上执行最终聚合之前,预聚合许多计算机上的数据。这些操作没有神奇的数字,但是一般的策略是将任何热键拆分为足够的子分片,以使任何单个分片都处于管道可以维持的按工作人员吞吐量的水平之下。
GroupByKey
)将对每个窗口执行单独的分组。与仅提供全局同步窗口的其他系统不同,Dataflow分别窗口化每个键的数据。这就是我们提供灵活的每键窗口(例如
sessions)的功能。有关更多信息,建议您阅读Dataflow文档中的
windowing guide。
pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(GroupByKey.<KeyType, ValueType>create())
.apply(ParDo.of(new DoFn<KV<KeyType, Iterable<ValueType>>, Long>() {
public void processElement(ProcessContext c) {
c.output(c.element().size());
}
}));
pcollection.apply(Window.into(FixedWindows.of(1, TimeUnit.DAYS)))
.apply(Count.perKey());
关于google-cloud-dataflow - 如何确保我的数据流管道可扩展?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34956180/
我正在使用 Assets 管道来管理我的 Grails 3.0 应用程序的前端资源。但是,似乎没有创建 CoffeeScript 文件的源映射。有什么办法可以启用它吗? 我的 build.gradle
我有一个我想要的管道: 提供一些资源, 运行一些测试, 拆资源。 我希望第 3 步中的拆卸任务运行 不管 测试是否通过或失败,在第 2 步。据我所知 runAfter如果前一个任务成功,则只运行一个任
如果我运行以下命令: Measure-Command -Expression {gci -Path C:\ -Recurse -ea SilentlyContinue | where Extensio
我知道管道是一个特殊字符,我需要使用: Scanner input = new Scanner(System.in); String line = input.next
我再次遇到同样的问题,我有我的默认处理方式,但它一直困扰着我。 有没有更好的办法? 所以基本上我有一个运行的管道,在管道内做一些事情,并想从管道内返回一个键/值对。 我希望整个管道返回一个类型为 ps
我有三个环境:dev、hml 和 qa。 在我的管道中,根据分支,阶段有一个条件来检查它是否会运行: - stage: Project_Deploy_DEV condition: eq(varia
我有 Jenkins Jenkins ver. 2.82 正在运行并想在创建新作业时使用 Pipeline 功能。但我没有看到这个列为选项。我只能在自由式项目、maven 项目、外部项目和多配置之间进
在对上一个问题 (haskell-data-hashset-from-unordered-container-performance-for-large-sets) 进行一些观察时,我偶然发现了一个奇
我正在寻找有关如何使用管道将标准输出作为其他命令的参数传递的见解。 例如,考虑这种情况: ls | grep Hello grep 的结构遵循以下模式:grep SearchTerm PathOfFi
有没有办法不因声明性管道步骤而失败,而是显示警告?目前我正在通过添加 || exit 0 来规避它到 sh 命令行的末尾,所以它总是可以正常退出。 当前示例: sh 'vendor/bin/phpcs
我们正在从旧的 Jenkins 设置迁移到所有计划都是声明性 jenkinsfile 管道的新服务器……但是,通过使用管道,我们无法再手动清除工作区。我如何设置 Jenkins 以允许 手动点播清理工
我在 Python 中阅读了有关 Pipelines 和 GridSearchCV 的以下示例: http://www.davidsbatista.net/blog/2017/04/01/docume
我有一个这样的管道脚本: node('linux'){ stage('Setup'){ echo "Build Stage" } stage('Build'){ echo
我正在使用 bitbucket 管道进行培训 这是我的 bitbucket-pipelines.yml: image: php:7.2.9 pipelines: default:
我正在编写一个程序,其中输入文件被拆分为多个文件(Shamir 的 secret 共享方案)。 这是我想象的管道: 来源:使用 Conduit.Binary.sourceFile 从输入中读取 导管:
我创建了一个管道,它有一个应该只在开发分支上执行的阶段。该阶段还需要用户输入。即使我在不同的分支上,为什么它会卡在这些步骤的用户输入上?当我提供输入时,它们会被正确跳过。 stage('Deplo
我正在尝试学习管道功能(%>%)。 当试图从这行代码转换到另一行时,它不起作用。 ---- R代码--原版----- set.seed(1014) replicate(6,sample(1:8))
在 Jenkins Pipeline 中,如何将工件从以前的构建复制到当前构建? 即使之前的构建失败,我也想这样做。 最佳答案 Stuart Rowe 还在 Pipeline Authoring Si
我正在尝试使用 执行已定义的作业构建 使用 Jenkins 管道的方法。 这是一个简单的例子: build('jenkins-test-project-build', param1 : 'some-
当我使用 where 过滤器通过管道命令排除对象时,它没有给我正确的输出。 PS C:\Users\Administrator> $proall = Get-ADComputer -filter *
我是一名优秀的程序员,十分优秀!