gpt4 book ai didi

java - 如何在 apache flink 中加入两个流?

转载 作者:行者123 更新时间:2023-12-02 18:19:18 25 4
gpt4 key购买 nike

我正在开始使用 flink 并查看 one of the official tutorials

据我了解,此练习的目标是在时间属性上连接两个流。

任务:

The result of this exercise is a data stream of Tuple2 records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.

The resulting stream should be printed to standard out.

问题:EnrichmentFunction 如何能够连接两个流(又名)。它如何知道参加哪个展会和哪个骑行?我希望它能够缓冲多个展会/游乐设施,直到传入的展会/游乐设施有匹配的合作伙伴。

根据我的理解,它只是保存它看到的每一次骑行/展会,并将其与下一个最佳骑行/展会结合起来。为什么这是正确的加入?

提供的解决方案:

/*
* Copyright 2017 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
* Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
* (http://training.data-artisans.com).
*
* The goal for this exercise is to enrich TaxiRides with fare information.
*
* Parameters:
* -rides path-to-input-file
* -fares path-to-input-file
*
*/
public class RidesAndFaresSolution extends ExerciseBase {
public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
final String ridesFile = params.get("rides", pathToRideData);
final String faresFile = params.get("fares", pathToFareData);

final int delay = 60; // at most 60 seconds of delay
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);

DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
.keyBy("rideId");

DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
.keyBy("rideId");

DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
.flatMap(new EnrichmentFunction());

printOrTest(enrichedRides);

env.execute("Join Rides with Fares (java RichCoFlatMap)");
}

public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
// keyed, managed state
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;

@Override
public void open(Configuration config) {
rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}

@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
}
}

@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(new Tuple2(ride, fare));
} else {
fareState.update(fare);
}
}
}
}

最佳答案

在这个特定的背景下training exercise on stateful enrichment ,每个rideId 值都有三个事件——TaxiRide 开始事件、TaxiRide 结束事件和TaxiFare。此练习的目标是将每个 TaxiRide 开始事件与具有相同rideId 的一个 TaxiFare 事件连接起来,或者换句话说,加入rideId 上的乘车流和票价流,同时知道两者只有一个。

此练习演示了 Flink 中键控状态的工作原理。键控状态实际上是一个分片键值存储。当我们有 ValueState 的项目时,如ValueState<TaxiRide> rideState ,Flink 将为每个不同的键值( rideId )在其状态后端存储一条单独的记录。

每次flatMap1flatMap2被调用时,上下文中隐式存在一个键(a rideId ),当我们调用 rideState.update(ride) 时或rideState.value()我们不是访问单个变量,而是使用 rideId 设置和获取键值存储中的条目。作为 key 。

在本练习中,两个流均以 rideId 作为键控。 ,因此 rideState 中可能存在一个元素和 fareState 的一个元素对于每个不同的 rideId 。因此,所提供的解决方案是缓冲大量的乘车和票价,但每个rideId只有一个。 (这已经足够了,因为该数据集中的游乐设施和票价完美匹配)。

所以,你问:

How is the EnrichmentFunction able to join the two streams aka. how does it know which fare to join with which ride?

答案是

It joins the fare having the same rideId.

您询问的这个特定练习展示了如何实现简单的丰富连接,以便理解键控状态和连接流的想法。但使用 Flink 肯定可以实现更复杂的连接。请参阅 joins using the DataStream API 上的文档, joins with Flink's Table API ,和joins with Flink SQL .

关于java - 如何在 apache flink 中加入两个流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54277910/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com