- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个在 Google Cloud Dataflow 上运行的 Apache Beam 管道。这是一个流管道,它从 Google Cloud PubSub 接收输入消息,这些消息基本上是要处理的元素的 JSON 数组。
粗略地说,管道有以下步骤:
PCollecttion<List<T>>
.PCollection<T>
.GroupByKey
(这是问题步骤):它转换 PCollection
回到Pcollection<List<T>>
但它不会等待所有元素。我无法获取最后一个GroupByKey
将收到的所有元素分组在一起。发布的消息不包含必须处理的元素,并且比跳到末尾的元素花费的时间更长。
我认为如果我可以编写自定义数据驱动触发器,这将很容易解决。或者即使我可以动态设置触发器 AfterPane.elementCountAtLeast()
来自定制WindowFn
。
我似乎无法制作自定义触发器。但是是否可以以某种方式动态地为每个窗口设置触发器?
--
这是我正在开发的管道的简化版本。
我简化了对象数组 T
的输入转换成一个简单的整数数组。我已经模拟了这些整数的键(或 ID)。通常它们是对象的一部分。
我还将缓慢的处理步骤(实际上是几个步骤)简化为具有人为延迟的单个步骤。
(完整示例要点 https://gist.github.com/naringas/bfc25bcf8e7aca69f74de719d75525f2 )
PCollection<String> queue = pipeline
.apply("ReadQueue", PubsubIO.readStrings().fromTopic(topic))
.apply(Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.standardSeconds(3))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes());
TupleTag<List<KV<Integer, Integer>>> tagDeserialized = new TupleTag<List<KV<Integer, Integer>>>() {};
TupleTag<Integer> tagDeserializeError = new TupleTag<Integer>() {};
PCollectionTuple imagesInputTuple = queue
.apply("DeserializeJSON", ParDo.of(new DeserializingFn()).withOutputTags(tagDeserialized, TupleTagList.of(tagDeserializeError)));
/*
This is where I think that I must adjust the custom window strategy, set the customized dynamic-trigger
*/
PCollection<KV<Integer, Integer>> images = imagesInputTuple.get(tagDeserialized)
/* I have tried many things
.apply(Window.<List<KV<Integer, Integer>>>into(new GlobalWindows()))
*/
.apply("Flatten into timestamp", ParDo.of(new DoFn<List<KV<Integer, Integer>>, KV<Integer, Integer>>() {
// Flatten and output into same ts
// like Flatten.Iterables() but I set the output window
@ProcessElement
public void processElement(@Element List<KV<Integer, Integer>> input, OutputReceiver<KV<Integer, Integer>> out, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
Instant timestamp = w.maxTimestamp();
for (KV<Integer, Integer> el : input) {
out.outputWithTimestamp(el, timestamp);
}
}
}))
.apply(Window.<KV<Integer, Integer>>into(new GlobalWindows()));
TupleTag<KV<Integer, Integer>> tagProcess = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagSkip = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple preproc = images
.apply("PreProcessingStep", ParDo.of(new SkipOrNotDoFn()).withOutputTags(tagProcess, TupleTagList.of(tagSkip)));
TupleTag<KV<Integer, Integer>> tagProcessed = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagError = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple processed = preproc.get(tagProcess)
.apply("ProcessingStep", ParDo.of(new DummyDelasyDoFn).withOutputTags(tagProcessed, TupleTagList.of(tagError)));
/* Here, at the "end"
the elements get grouped back
first: join into a PcollectionList and flatten it
second: GroupByKey which should but doesn't way for all elements
lastly: serilize and publish (in this case just print out)
*/
PCollection end = PCollectionList.of(preproc.get(tagSkip)).and(processed.get(tagProcessed))
.apply("FlattenUpsert", Flatten.pCollections())
//
.apply("GroupByParentId", GroupByKey.create())
.apply("GroupedValues", Values.create())
.apply("PublishSerialize", ParDo.of(
new DoFn<Object, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
String output = GSON.toJson(pc.element());
LOG.info("DONE: {}", output);
pc.output(output);
}
}));
// "send the string to pubsub" goes here
最佳答案
我玩了一下有状态管道。当您想使用数据驱动触发器或 AfterPane.elementCountAtLeast()
我假设您知道符合消息的元素数量(或者至少每个键不会改变),所以我定义了 NUM_ELEMENTS = 10
就我而言。
我的方法的主要思想是跟踪到目前为止我所看到的特定键的元素数量。请注意,我必须合并 PreProcessingStep
和ProcessingStep
合并为一个以进行准确计数。我知道这只是一个简化的示例,所以我不知道如何将其转化为实际场景。
在有状态的 ParDo 中,我定义了两个状态变量,一个 BagState
看到所有整数和 ValueState
计算错误数量:
// A state bag holding all elements seen for that key
@StateId("elements_seen")
private final StateSpec<BagState<Integer>> elementSpec =
StateSpecs.bag();
// A state cell holding error count
@StateId("errors")
private final StateSpec<ValueState<Integer>> errorSpec =
StateSpecs.value(VarIntCoder.of());
然后我们像往常一样处理每个元素,但我们不会输出任何内容,除非它是一个错误。在这种情况下,我们在将元素发送到tagError
之前更新错误计数器。侧面输出:
errors.write(firstNonNull(errors.read(), 0) + 1);
is_error = true;
output.get(tagError).output(input);
我们更新计数,对于成功处理或跳过的元素(即 !is_error
),将新观察到的元素写入 BagState
:
int count = firstNonNull(Iterables.size(state.read()), 0) + firstNonNull(errors.read(), 0);
if (!is_error) {
state.add(input.getValue());
count += 1;
}
然后,如果成功处理的元素和错误的总和等于 NUM_ELEMENTS
(我们在这里模拟数据驱动的触发器),我们刷新 BagState
中的所有项目。 :
if (count >= NUM_ELEMENTS) {
Iterable<Integer> all_elements = state.read();
Integer key = input.getKey();
for (Integer value : all_elements) {
output.get(tagProcessed).output(KV.of(key, value));
}
}
请注意,这里我们已经可以对值进行分组并仅发出一个 KV<Integer, Iterable<Integer>>
反而。我刚刚做了一个for
而是循环以避免更改下游的其他步骤。
这样,我发布了一条消息,例如:
gcloud pubsub topics publish streamdemo --message "[1,2,3,4,5,6,7,8,9,10]"
我之前得到的地方:
INFO: DONE: [4,8]
现在我得到:
INFO: DONE: [1,2,3,4,5,6,8,9,10]
元素7
不存在,因为它是模拟错误的。
使用 DirectRunner
进行测试和2.16.0 SDK。完整代码here .
请告诉我这是否适合您的用例,请记住我只做了一些小测试。
关于google-cloud-dataflow - 如何根据处理的元素数量动态触发窗口?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59061351/
这个问题已经有答案了: jQuery trigger click vs click ()? (3 个回答) 已关闭 5 年前。 我无法区分 trigger('click')与 trigger('cli
我正在运行 VS 2008 和 .NET 3.5 SP1。 我想在 HttpModule 中实现命中跟踪在我的 ASP.NET 应用程序中。很简单,我想。然而,BeginRequest我的事件 Htt
这是一段代码,我收到以下错误 #1064 - You have an error in your SQL syntax; check the manual that corresponds to yo
有没有办法用任意增量触发滚轮事件。就像 jQuery 对“点击”所做的那样: $('#selector').trigger('click'); 我需要类似的东西,只需一个滚轮即可: $('#selec
我正在尝试在配音数据库中触发时间。我想检查一下在不出现角色的电影配音中不能对角色进行配音。这是PDM: 和CDM 我是SQL的初学者,但我知道表“DUBBES”中应该有一些触发器。我试图做这样的事情,
这个问题已经有答案了: jquery programmatically click on new dom element (3 个回答) 已关闭 6 年前。 我有一个 jQuery 事件定义如下: $
主菜单的点击代码适用于类更改,但不适用于子菜单...当单击食物或鞋子等子菜单项时,它不会触发警报命令...事实上,悬停非常适合子菜单但不是活跃的 HTML
问题非常简单: $('#btn1').click(function(event){ alert( "pageX: " + event.pageX + "\npa
我使用 Spring 的调度程序 (@EnableScheduling) 并具有以下 @Scheduled 方法,该方法每分钟调用一次: @Component public class Schedul
错误 SQL 查询:文档 CREATE TRIGGER `triggers_div` AFTER INSERT ON `produits` FOR EACH ROW BEGIN INSERT INTO
我想在插入另一个表时填充表中的一些列值,并为特定列设置条件。我使用触发器: CREATE TRIGGER inserttrigger AFTER INSERT ON table1 FOR EACH R
我可以在 5.6 MySQL 环境中使用一些关于触发器的指导。我想创建一个触发器,如果发现具有相同速度的电脑的价格较低,则该触发器会停止更新。 架构是产品(制造商、型号、类型)PC(型号、速度、内
背景:我们有一个 completed_flag,默认为 0,当有人完成调查时更新为 1。我想记录这次更新发生的时间戳 在编写了这个触发器/函数以在标志从 0 触发到 1 时更新时间戳后,我怀疑我这样做
数据库中有两个表 KistStatus和 LastKistStatus .后者将保存 KistStatus 的所有“最新”值。 . KistStatus有大约 174.000 条记录,LastKist
我正在开发一个使用 APNS 的 iPhone 应用程序。我很清楚实现 APNS、创 build 备 token 的过程,等等等等……我不知道如何通过 Web 服务从提供商端触发和启动 APNS。任何
我有这个 javascript,当数量更改时会触发 update_cart... jQuery('div.woocommerce').on('change', '.qty', function
当我单击任何按钮时,click 事件不会被触发。艰难的是,我使用 $("div").on("click", "button", function () { 让它工作,但我想看到它使用 .class 工
如何在我的代码中触发 Android onCreateOptionsMenu 函数,即无需用户单击手机上的选项菜单按钮? 最佳答案 Activity.openOptionsMenu(); 就可以了 关
我将表单包装在 中然后我设置 list android:windowSoftInputMode="adjustResize" (默认 react native )。现在,当我用手指触摸事件手动聚焦一
我有一个 Android 编程问题。使用下面的代码我想验证一个字符串匹配。它验证正常,但 LogCat 显示 TextWatcher 方法在每次击键时触发两次,我不明白为什么。我希望每次击键只触发一次
我是一名优秀的程序员,十分优秀!