- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个在 Google Cloud Dataflow 上运行的 Apache Beam 管道。这是一个流管道,它从 Google Cloud PubSub 接收输入消息,这些消息基本上是要处理的元素的 JSON 数组。
粗略地说,管道有以下步骤:
PCollecttion<List<T>>
.PCollection<T>
.GroupByKey
(这是问题步骤):它转换 PCollection
回到Pcollection<List<T>>
但它不会等待所有元素。我无法获取最后一个GroupByKey
将收到的所有元素分组在一起。发布的消息不包含必须处理的元素,并且比跳到末尾的元素花费的时间更长。
我认为如果我可以编写自定义数据驱动触发器,这将很容易解决。或者即使我可以动态设置触发器 AfterPane.elementCountAtLeast()
来自定制WindowFn
。
我似乎无法制作自定义触发器。但是是否可以以某种方式动态地为每个窗口设置触发器?
--
这是我正在开发的管道的简化版本。
我简化了对象数组 T
的输入转换成一个简单的整数数组。我已经模拟了这些整数的键(或 ID)。通常它们是对象的一部分。
我还将缓慢的处理步骤(实际上是几个步骤)简化为具有人为延迟的单个步骤。
(完整示例要点 https://gist.github.com/naringas/bfc25bcf8e7aca69f74de719d75525f2 )
PCollection<String> queue = pipeline
.apply("ReadQueue", PubsubIO.readStrings().fromTopic(topic))
.apply(Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.standardSeconds(3))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes());
TupleTag<List<KV<Integer, Integer>>> tagDeserialized = new TupleTag<List<KV<Integer, Integer>>>() {};
TupleTag<Integer> tagDeserializeError = new TupleTag<Integer>() {};
PCollectionTuple imagesInputTuple = queue
.apply("DeserializeJSON", ParDo.of(new DeserializingFn()).withOutputTags(tagDeserialized, TupleTagList.of(tagDeserializeError)));
/*
This is where I think that I must adjust the custom window strategy, set the customized dynamic-trigger
*/
PCollection<KV<Integer, Integer>> images = imagesInputTuple.get(tagDeserialized)
/* I have tried many things
.apply(Window.<List<KV<Integer, Integer>>>into(new GlobalWindows()))
*/
.apply("Flatten into timestamp", ParDo.of(new DoFn<List<KV<Integer, Integer>>, KV<Integer, Integer>>() {
// Flatten and output into same ts
// like Flatten.Iterables() but I set the output window
@ProcessElement
public void processElement(@Element List<KV<Integer, Integer>> input, OutputReceiver<KV<Integer, Integer>> out, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
Instant timestamp = w.maxTimestamp();
for (KV<Integer, Integer> el : input) {
out.outputWithTimestamp(el, timestamp);
}
}
}))
.apply(Window.<KV<Integer, Integer>>into(new GlobalWindows()));
TupleTag<KV<Integer, Integer>> tagProcess = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagSkip = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple preproc = images
.apply("PreProcessingStep", ParDo.of(new SkipOrNotDoFn()).withOutputTags(tagProcess, TupleTagList.of(tagSkip)));
TupleTag<KV<Integer, Integer>> tagProcessed = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagError = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple processed = preproc.get(tagProcess)
.apply("ProcessingStep", ParDo.of(new DummyDelasyDoFn).withOutputTags(tagProcessed, TupleTagList.of(tagError)));
/* Here, at the "end"
the elements get grouped back
first: join into a PcollectionList and flatten it
second: GroupByKey which should but doesn't way for all elements
lastly: serilize and publish (in this case just print out)
*/
PCollection end = PCollectionList.of(preproc.get(tagSkip)).and(processed.get(tagProcessed))
.apply("FlattenUpsert", Flatten.pCollections())
//
.apply("GroupByParentId", GroupByKey.create())
.apply("GroupedValues", Values.create())
.apply("PublishSerialize", ParDo.of(
new DoFn<Object, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
String output = GSON.toJson(pc.element());
LOG.info("DONE: {}", output);
pc.output(output);
}
}));
// "send the string to pubsub" goes here
最佳答案
我玩了一下有状态管道。当您想使用数据驱动触发器或 AfterPane.elementCountAtLeast()
我假设您知道符合消息的元素数量(或者至少每个键不会改变),所以我定义了 NUM_ELEMENTS = 10
就我而言。
我的方法的主要思想是跟踪到目前为止我所看到的特定键的元素数量。请注意,我必须合并 PreProcessingStep
和ProcessingStep
合并为一个以进行准确计数。我知道这只是一个简化的示例,所以我不知道如何将其转化为实际场景。
在有状态的 ParDo 中,我定义了两个状态变量,一个 BagState
看到所有整数和 ValueState
计算错误数量:
// A state bag holding all elements seen for that key
@StateId("elements_seen")
private final StateSpec<BagState<Integer>> elementSpec =
StateSpecs.bag();
// A state cell holding error count
@StateId("errors")
private final StateSpec<ValueState<Integer>> errorSpec =
StateSpecs.value(VarIntCoder.of());
然后我们像往常一样处理每个元素,但我们不会输出任何内容,除非它是一个错误。在这种情况下,我们在将元素发送到tagError
之前更新错误计数器。侧面输出:
errors.write(firstNonNull(errors.read(), 0) + 1);
is_error = true;
output.get(tagError).output(input);
我们更新计数,对于成功处理或跳过的元素(即 !is_error
),将新观察到的元素写入 BagState
:
int count = firstNonNull(Iterables.size(state.read()), 0) + firstNonNull(errors.read(), 0);
if (!is_error) {
state.add(input.getValue());
count += 1;
}
然后,如果成功处理的元素和错误的总和等于 NUM_ELEMENTS
(我们在这里模拟数据驱动的触发器),我们刷新 BagState
中的所有项目。 :
if (count >= NUM_ELEMENTS) {
Iterable<Integer> all_elements = state.read();
Integer key = input.getKey();
for (Integer value : all_elements) {
output.get(tagProcessed).output(KV.of(key, value));
}
}
请注意,这里我们已经可以对值进行分组并仅发出一个 KV<Integer, Iterable<Integer>>
反而。我刚刚做了一个for
而是循环以避免更改下游的其他步骤。
这样,我发布了一条消息,例如:
gcloud pubsub topics publish streamdemo --message "[1,2,3,4,5,6,7,8,9,10]"
我之前得到的地方:
INFO: DONE: [4,8]
现在我得到:
INFO: DONE: [1,2,3,4,5,6,8,9,10]
元素7
不存在,因为它是模拟错误的。
使用 DirectRunner
进行测试和2.16.0 SDK。完整代码here .
请告诉我这是否适合您的用例,请记住我只做了一些小测试。
关于google-cloud-dataflow - 如何根据处理的元素数量动态触发窗口?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59061351/
我有一个 Cloud Run 服务,它通过 SQLAlchemy 访问 Cloud SQL 实例.但是,在 Cloud Run 的日志中,我看到 CloudSQL connection failed.
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 4年前关闭。 Improve t
在将 docker 容器镜像部署到 Cloud Run 时,我可以选择一个区域,这很好。 Cloud Run 将构建委托(delegate)给 Cloud Build,后者显然会创建两个存储桶来实现这
我正在尝试将 Cloud Functions 用作由 PubSub 触发的异步后台工作程序,并进行更长时间的工作(以分钟为单位)。完整代码在这里https://github.com/zdenulo/c
这是/etc/cloud/cloud.cfg的内容Ubuntu云16.04镜像: # The top level settings are used as module # and system co
如何从 Google Cloud Function 启动 Cloud Dataflow 作业?我想使用 Google Cloud Functions 作为启用跨服务组合的机制。 最佳答案 我已经包含了
我想使用 Cloud Shell 在我的第二代 Cloud Sql 实例上运行数据库迁移。 我找到了一个 example in the docs关于如何使用 gcloud 进行连接.但是当我运行命令时
我正在尝试使用 Google Cloud PubSub和我的 Google Cloud Dataproc群集,我收到如下身份验证范围错误: { "code" : 403, "errors" :
这是我的用例。 我已经有一个以私有(private)模式部署的 Cloud Run 服务。 (与云功能相同的问题) 我正在开发使用此 Cloud Run 的新服务。我在应用程序中使用默认凭据进行身份验
如何连接到 Cloud SQL 上的数据库,而无需在容器中添加我的凭据文件? 最佳答案 使用 UNIX 域套接字 (Java) 从云运行(完全托管)连接到云 SQL At this time Clou
我有一个google-cloud-ml作业,需要从gs存储桶加载numpy .npz文件。我遵循了this example上关于如何从gs加载.npy文件的操作,但是由于.npz文件已压缩,因此它对我
我想创建链接到另一个项目中的 Cloud Source Repository 的 Cloud Build 触发器。但是当我在应该选择存储库的步骤中时,列表是空的。我尝试了不同的许可,但没有运气。谁能告
向 Twilio 发送 SMS 时,Twilio 会向指定的 URL 发送多个请求,以通过 Webhook 提供该 SMS 传送的状态。我想让这个回调异步,所以我开发了一个 Cloud Functio
我需要更改我的项目 ID,因为要验证的 Firebase 身份验证链接在链接上显示了项目 ID,并且由于品牌 reshape ,项目名称已更改。根据我发现的信息,更改项目 ID 似乎不太可能。我正在考
用于部署我的 Angular 应用程序的 CI/CD 管道已关闭,但我看到 Google Cloud Run 在容器镜像更新后没有部署新修订版。 我已将 Cloud Build 设置为在 GitHub
报价https://cloud.google.com/load-balancing/docs/https/setting-up-https-serverless#enabling While Goog
Cloud Spanner 提供了两种不同的 API。 Cloud Spanner 读取与 Cloud Spanner SQL API 之间有什么区别? 最佳答案 在幕后,它们都使用相同的执行机制,因
我是 GCP 堆栈的新手,所以我对用于存储数据的 GCP 技术数量感到非常困惑: https://cloud.google.com/products/storage 虽然上面的文章中没有提到googl
我发现 Google Cloud Functions 的网络出站费用令人惊讶,我正在尝试了解发生这种情况的原因以及如何避免这种情况。 Stackdriver 监控表明有问题的函数是我的 ingest
我使用 Prisma使用 Cloud Run 和 Cloud SQL。在向 prisma.schema 提供 DATABASE_URL 后,它会在运行时抛出一个错误。 Can't reach data
我是一名优秀的程序员,十分优秀!