gpt4 book ai didi

java - 在 Flink 中合并多个流加入

转载 作者:行者123 更新时间:2023-12-02 01:05:25 24 4
gpt4 key购买 nike

我有来自不同来源的三个不同的流(对象: Trade,MarketData, WeightAdj,它们唯一的共同点是“产品”。这是我的流。

交易流: tradeid、产品、执行

市场数据流:产品、市场数据

Computestream:产品、因素

我想使用 Flink 实现什么我想加入所有三个流并生成 Tuple3<Trade,MarketData,WeightAdj > 的最新值。这意味着每次如果这些流中的任何一个发出事件,我应该获得最新的 Tuple3<Trade,MarketData,WeightAdj>

我尝试使用“连接”功能加入这些流,然后使用 keyBy但如果发出 MarketData 或 WeightAdj 事件,则不会生成 Enriched 对象。

public static void main(String[] args) throws Exception {
// some code
tradeStream.connect(marketStream)
.keyBy(
new KeySelector<Trade, String>() {
@Override
public String getKey(Trade trd) throws Exception {
return trd.product;
}
}, new KeySelector<MarketData, String>() {
@Override
public String getKey(MarketData marketData)
throws Exception {
return marketData.product;
}
}

)
.flatMap(new JoinRichCoFlatMapFunction())
.connect(weightStream)
.keyBy(new KeySelector<Tuple2<Trade, MarketData>, String>() {
@Override
public String getKey(Tuple2<Trade, MarketData> trd) throws Exception {
return trd.f0.product;
}
}, new KeySelector<WeightAdj, String>() {
@Override
public String getKey(WeightAdj wght) throws Exception {
return wght.product;
}
})
.flatMap(new TupleWeightJionRichCoFlatMapFunction())
.print();
}

public static final class JoinRichCoFlatMapFunction extends RichCoFlatMapFunction<Trade, MarketData, Tuple2<Trade, MarketData>>{

private ValueState<Trade> trades;
private ValueState<MarketData> marketData;

@Override
public void open(Configuration config) {
trades = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", Trade.class));
marketData = getRuntimeContext().getState(new ValueStateDescriptor<>("MarketData", MarketData.class));
}

@Override
public void flatMap1(Trade trd,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

MarketData mktData = marketData.value();
if (mktData != null) {
marketData.clear();
out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
} else {
trades.update(trd);;
}
}

@Override
public void flatMap2(MarketData mktData,Collector<Tuple2<Trade, MarketData>> out) throws Exception {

Trade trd = trades.value();
if (trd != null) {
trades.clear();
out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
} else {
marketData.update(mktData);;
}
}
}

public static final class TupleWeightJionRichCoFlatMapFunction extends RichCoFlatMapFunction<Tuple2<Trade, MarketData>, WeightAdj, Tuple3<Trade, MarketData, WeightAdj>>{

private ValueState<Tuple2<Trade, MarketData>> tradeMarketState;
private ValueState<WeightAdj> weightState;

@Override
public void open(Configuration config) {

TypeInformation<Tuple2<Trade, MarketData>> info = TypeInformation.of(new TypeHint<Tuple2<Trade, MarketData>>(){});
tradeMarketState = getRuntimeContext().getState(new ValueStateDescriptor<>("Trades", info));
weightState = getRuntimeContext().getState(new ValueStateDescriptor<>("Weights", WeightAdj.class));
}

@Override
public void flatMap1(Tuple2<Trade, MarketData> trdWithMaktData, Collector<Tuple3<Trade, MarketData, WeightAdj>> out)
throws Exception {

WeightAdj weigt = weightState.value();
if (weigt != null) {
weightState.clear();
out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMaktData.f0, trdWithMaktData.f1, weigt));
} else {
tradeMarketState.update(trdWithMaktData);;
}
}

@Override
public void flatMap2(WeightAdj weightData,Collector<Tuple3<Trade, MarketData, WeightAdj>> out) throws Exception {

Tuple2<Trade, MarketData> trdWithMktData = tradeMarketState.value();
if (trdWithMktData != null) {
tradeMarketState.clear();
out.collect(new Tuple3<Trade, MarketData, WeightAdj>(trdWithMktData.f0, trdWithMktData.f1, weightData));
} else {
weightState.update(weightData);;
}
}
}

知道我做错了什么吗?

最佳答案

如果我正确理解了您的目标,那么有几点需要以不同的方式处理:

  • 不要对任何状态调用 clear(),因为您需要继续记住从三个流中的每一个流中看到的最后一个值。
  • 始终调用out.collect()。如果调用 flatmap1flatmap2,则意味着某些内容已更新,因此有新内容需要报告。

(看起来您正在模仿 Flink 训练中 RidesAndFares exercise 中使用的逻辑。在该练习中,要求有所不同:在这种情况下,需要组合一对 Ride 和 Fare 事件,一次性。找到给定rideId的乘车/票价对后,该rideId就完成了连接。)

现在有一些注意事项:

  • 如果您从不调用 clear() 并且乘积空间是无限的,那么您将无限期地保留不断增加的状态量。如果这是一个问题,您可以使用 state TTL安排清除陈旧状态。
  • 请记住,如果与 RocksDB 一起使用,Tuple 序列化程序无法处理 null。我很想按照以下方式重写每个平面 map 方法:
public void flatMap1(Trade trd, Collector<Tuple2<Trade, MarketData>> out) throws Exception {

trades.update(trd);;
MarketData mktData = marketData.value();
out.collect(new Tuple2<Trade, MarketData>(trd, mktData));
}

但是当应用程序启动时,这可能会生成一个 Tuple2,其中 mktData 为 null。因此,防止这种情况发生是个好主意。

正如 Arvid 提到的,Table/SQL API 使这些类型的联接变得简单。

关于java - 在 Flink 中合并多个流加入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60111236/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com