gpt4 book ai didi

google-cloud-platform - 针对数据流中的 "table"加入流

转载 作者:行者123 更新时间:2023-12-03 22:33:38 25 4
gpt4 key购买 nike

让我用一个稍微做作的例子来解释我想要做什么。想象一下,我有一个交易流,股票代码、股票数量和价格:{ symbol = "GOOG", count = 30, price = 200 } .我想用股票的名称来丰富这些事件,在本例中为 "Google" .

为此,我想,内部数据流 , 维护一个由 PCollection<KV<String, String>> 更新的符号->名称映射的“表” ,并加入我与这张 table 的交易流,产生例如一个 PCollection<KV<Trade, String>> .

这似乎是流处理应用程序的一个彻底的基本用例,但我很难弄清楚如何在 Dataflow 中实现这一点。我知道这在 Kafka Streams 中是可能的。

请注意,我是 不是 想要使用外部数据库进行查找——我需要在 Dataflow 中解决这个问题或切换到 Kafka Streams。

最佳答案

我将描述两个选项。一种使用应与当前版本的 Dataflow (1.X) 一起使用的侧输入,另一种使用 DoFn 中的状态这应该是即将到来的 Dataflow (2.X) 的一部分。

Dataflow 1.X 的解决方案,使用侧输入

这里的总体思路是使用映射值 side-input使符号->名称映射对所有工作人员可用。

该表需要在全局窗口中(因此不会过期),需要触发每个元素(或者您希望生成新更新的频率),并在所有触发中累积元素。它还需要一些逻辑来获取每个符号的最新名称。

这种解决方案的缺点是每次有新条目进入时都会重新生成整个查找表,并且不会立即推送给所有工作人员。相反,每个人都将在 future “某个时候”获得新的映射。

在高层次上,这个管道可能看起来像(我没有测试过这段代码,所以可能有一些类型):

PCollection<KV<Symbol, Name>> symbolToNameInput = ...;
final PCollectionView<Map<Symbol, Iterable<Name>>> symbolToNames = symbolToNameInput
.apply(Window.into(GlobalWindows.of())
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))
.accumulatingFiredPanes())
.apply(View.asMultiMap())

请注意,我们必须使用 viewAsMultiMap这里。这意味着我们实际上为每个符号建立了所有名称。当我们查找内容时,我们需要确保在可迭代对象中采用最新的名称。

PCollection<Detail> symbolDetails = ...;
symbolDetails
.apply(ParDo.withSideInputs(symbolToNames).of(new DoFn<Detail, AugmentedDetails>() {
@Override
public void processElement(ProcessContext c) {
Iterable<Name> names = c.sideInput(symbolToNames).get(c.element().symbol());
Name name = chooseName(names);
c.output(augmentDetails(c.element(), name));
}
}));

使用 State API 的 Dataflow 2.X 解决方案

该解决方案使用了一项新功能,该功能将成为即将发布的 Dataflow 2.0 版本的一部分。它还不是预览版的一部分(目前是 Dataflow 2.0-beta1),但您可以观看 release notes看看它什么时候可用。

一般的想法是键控状态允许我们存储一些与特定键相关的值。在这种情况下,我们将记住我们看到的最新的“名称”值。

在运行有状态的 DoFn 之前我们将把每个元素包装成一个通用元素类型( NameOrDetails)对象。这将类似于以下内容:

// Convert SymbolToName entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> left = symbolToName
.apply(ParDo.of(new DoFn<SymbolToName, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
SymbolToName e = c.element();
c.output(KV.of(e.getSymbol(), NameOrDetails.name(e.getName())));
}
});

// Convert detailed entries to KV<Symbol, NameOrDetails>
PCollection<KV<Symbol, NameOrDetails>> right = details
.apply(ParDo.of(new DoFn<Details, KV<Symbol, NameOrDetails>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Details e = c.element();
c.output(KV.of(e.getSymobl(), NameOrDetails.details(e)));
}
});

// Flatten the two streams together
PCollectionList.of(left).and(right)
.apply(Flatten.create())
.apply(ParDo.of(new DoFn<KV<Symbol, NameOrDetails>, AugmentedDetails>() {
@StateId("name")
private final StateSpec<ValueState<String>> nameSpec =
StateSpecs.value(StringUtf8Coder.of());

@ProcessElement
public void processElement(ProcessContext c
@StateId("name") ValueState<String> nameState) {
NameOrValue e = c.element().getValue();
if (e.isName()) {
nameState.write(e.getName());
} else {
String name = nameState.read();
if (name == null) {
// Use symbol if we haven't received a mapping yet.
name = c.element().getKey();
}
c.output(e.getDetails().withName(name));
}
});

关于google-cloud-platform - 针对数据流中的 "table"加入流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41570276/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com