- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个项目,它有一个 apache beam 管道,其依赖项的设置方式使得我必须使用 PubSub 的 0.20.0-beta 版。此管道一直运行(无限制)。
[+] 问题:PubSub 消息每 30 分钟左右重复一次。
[+] 我尝试过的:我读过许多解决方案,其中提到数据流运行器如何具有确认发生的检查点。我还读过,使用 GroupByKey 等 PTransform 可以更快地确认这些消息。所以我尝试了窗口化、按键触发和分组,但我仍然从 PubSub 收到重复的消息。
[+] 问题:我究竟做错了什么?为什么消息没有被确认? (如果我理解正确,它不会在管道结束执行之前被确认??但我的管道需要很长时间,如何提前确认?)
这是特定于 0.20.0-beta 的“版本”错误,还是我应该能够将 PubsubIO.Reader 与窗口和触发一起使用以便更早确认?
[+]代码:
窗口时间为 10 秒,PubSub ack 截止时间为 60 秒。
.apply("Listen_To_PubSub", PubsubIO.readStrings().fromSubscription(subscription))
.apply("Windowing", Window.<String> into(window).triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeLimit)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
.apply("DeleteFromBQ", ParDo.of(new DeleteFromBQ()))
.apply("Mapping", ParDo.of(new Mapping()))
.apply("GroupByKey", GroupByKey.<String,String>create())
.apply("Acknowledge", ParDo.of(new Grouped()))
.apply("DoSomething1", ParDo.of(new DoSomething1()))
.apply("Flatten_Iterable", Flatten.iterables())
.apply("DoSomething2", ParDo.of(new DoSomething2()))
.apply("DoSomething3", ParDo.of(new DoSomething3()))
.apply("DoSomething4", ParDo.of(new DoSomething4()))
.apply("Write_To_BigQuery", BigQueryIO.writeTableRows()
.to(output)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
提前致谢!任何输入表示赞赏。
最佳答案
当您应用如此多的转换时,您似乎超过了 60 秒的确认期限。要查看需要多长时间,我建议使用 Logging Pipeline Messages .我认为您可能需要尽快移动确认。
您可以做的另一件事是使用更高的机器类型来更快地处理消息。
关于google-cloud-dataflow - 具体版本 : PubSub/Dataflow acknowledgement of unbounded data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51271594/
我遇到了一个小问题,请求您的帮助!使用 BeanIO 2.1 并处理固定长度文件,我当前正在尝试检索结构如下的记录 :28C:5n/5n ':28C:':修复 5 个数字(最多) '/':修复 5 个
我使用的 drscheme 来自: http://www.archlinux.org/packages/extra/x86_64/drscheme/ 我正在尝试使用教科书中的示例代码,但我不断收到“未
我正在尝试将一个整数参数 N 传递给 cake 并返回一个大小为 N 的 2 平方的列表(为了举例)。例如面包店:cake(3) => [4,4,4] 这是我迄今为止尝试过的: -module(bak
我正在使用 SLIME 来调试我的 Common Lisp 函数。在函数内部,我让它人为地发出错误信号(尝试“调试”——也许我应该单步执行),如下所示: (define-condition unkno
这让我陷入了困境(抱歉,我对 python 还是很陌生)感谢您提供任何形式的帮助。 错误 print Student.MostFrequent() TypeError: unbound method
自从我尝试将代码重构到不同的文件中以来,这个 Python 问题一直困扰着我。我有一个名为 object.py 的文件,其中的相关代码是: class Object: #this is a gener
需要一个关于如何将原始类型集合传递给未绑定(bind)函数/或操作以及如何返回原始类型集合的示例。 例如整数列表或数组。 这是一个简单的例子。 List GetEvenNumbers(List num
我使用 xsd.exe/out 从 XML 片段生成了一个 XSD 文件 它所做的是为某些元素创建 maxOccurs="unbounded" 属性。 如果我只希望该元素出现一次,而不是一个集合,我应
问题 为什么情况 2 会导致“未绑定(bind)占位符参数”,而情况 1 却可以? 案例1 val h: (Int => Int) = (x:Int) => { scala.util.Random.n
问题 为什么情况 2 会导致“未绑定(bind)占位符参数”,而情况 1 却可以? 案例1 val h: (Int => Int) = (x:Int) => { scala.util.Random.n
我试图在 Maven 下使用 wro4j(1.6.2 和 1.6.3-SNAPSHOT),但我遇到了 jquery.tablesorter.js 的问题插件。 在第 972 行(或附近)有一个 ecl
我想在一个时间窗口内尝试 avg() 聚合代码 select user_id,timestamp avg(y) over(range between '5 second' preceding and
我以前从未见过这个错误: TypeError:必须使用 test_imports 实例作为第一个参数调用未绑定(bind)方法 halt_listener()(取而代之的是 Queue 实例) 当我运
考虑以下示例 Simulink (Download example)系统: 输入是一个幅度和一个不断增加的角度,它将返回两个正弦,如预期的实部和虚部: 从实部和虚部计算幅度是没有问题的。获取域中的角度
对于我的高级高等计算类(class),我需要做一个项目,我的任务是制作太空入侵者,我以前从未使用过Python,到目前为止我对自己的进展非常满意,但是我最近遇到了一个我不知道的错误如何处理。代码如下。
我想创建一个python类来封装一些全局变量: class TestEnvironment(object): _on_out = None # ... @staticmetho
虽然最近将一个新项目导入到 eclipse 中,但有一次当我尝试添加 GROOVY_SUPPORT 库时,它会显示为“未绑定(bind)”并出现错误-X,如下所示: 它也没有像我预期的那样出现在包资源
如何在类定义后使类方法静态化?也就是说,为什么第三种情况会失败呢? >>> class b:... @staticmethod... def foo():... pass...>>> b.fo
我在完成项目后收到两条错误消息: 在解决构建路径错误之前无法构建项目。 未绑定(bind)的类路径容器:项目“method_test”中的“JRE 系统库 [OSGi/Minimum-1.2]” 我认
此代码显示未绑定(bind)异常,我尝试了所有组合..无法解决!! 整个代码是: 最佳答案 你需要这个 xmlns:android="htt
我是一名优秀的程序员,十分优秀!