gpt4 book ai didi

google-cloud-dataflow - Dataflow sideInput 是否可以通过读取 gcs 桶来更新每个窗口?

转载 作者:行者123 更新时间:2023-12-02 00:49:18 25 4
gpt4 key购买 nike

我目前正在创建一个 PCollectionView,方法是从 gcs 存储桶中读取过滤信息并将其作为侧输入传递到我的管道的不同阶段以过滤输出。如果 gcs 存储桶中的文件发生变化,我希望当前运行的管道使用这个新的过滤器信息。如果我的过滤器发生变化,是否有办法在每个新数据窗口上更新此 PCollectionView?我以为我可以在 startBundle 中做到这一点,但我不知道如何或是否可能。如果可能的话,你能举个例子吗?

PCollectionView<Map<String, TagObject>> 
tagMapView =
pipeline.apply(TextIO.Read.named("TagListTextRead")
.from("gs://tag-list-bucket/tag-list.json"))
.apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
.apply("MakeTagMapView", View.asSingleton());
PCollection<String>
windowedData =
pipeline.apply(PubsubIO.Read.topic("myTopic"))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
PCollection<MY_DATA>
lineData = windowedData
.apply(ParDo.named("ExtractJsonObject")
.withSideInputs(tagMapView)
.of(new ExtractJsonObjectFn()));

最佳答案

您可能想要“使用最多 1 分钟前版本的过滤器作为辅助输入”之类的东西(因为理论上文件可以频繁地、不可预测地并且独立于您的管道更改——所以没有办法真正使文件的更改与管道的行为完全同步)。

这是我能够想出的(当然,相当笨拙的)解决方案。它依赖于侧面输入也隐式地由窗口键入的事实。在此解决方案中,我们将创建一个窗口化为 1 分钟固定窗口的侧输入,其中每个窗口将包含标签映射的单个值,该值从该窗口内某个时刻的过滤器文件派生而来。

PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());

// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());

这种模式(加入缓慢变化的外部数据作为辅助输入)反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中对此有更好的支持。我已经提交了 BEAM JIRA issue跟踪这个。

关于google-cloud-dataflow - Dataflow sideInput 是否可以通过读取 gcs 桶来更新每个窗口?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41254028/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com