- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在开始使用 flink 并查看 one of the official tutorials 。
据我了解,此练习的目标是在时间属性上连接两个流。
任务:
The result of this exercise is a data stream of Tuple2 records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.
The resulting stream should be printed to standard out.
问题:EnrichmentFunction 如何能够连接两个流(又名)。它如何知道参加哪个展会和哪个骑行?我希望它能够缓冲多个展会/游乐设施,直到传入的展会/游乐设施有匹配的合作伙伴。
根据我的理解,它只是保存它看到的每一次骑行/展会,并将其与下一个最佳骑行/展会结合起来。为什么这是正确的加入?
提供的解决方案:
/*
* Copyright 2017 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dataartisans.flinktraining.solutions.datastream_java.state;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
/**
* Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
* (http://training.data-artisans.com).
*
* The goal for this exercise is to enrich TaxiRides with fare information.
*
* Parameters:
* -rides path-to-input-file
* -fares path-to-input-file
*
*/
public class RidesAndFaresSolution extends ExerciseBase {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String ridesFile = params.get("rides", pathToRideData);
final String faresFile = params.get("fares", pathToFareData);
final int delay = 60; // at most 60 seconds of delay
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
.keyBy("rideId");
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
.keyBy("rideId");
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
.flatMap(new EnrichmentFunction());
printOrTest(enrichedRides);
env.execute("Join Rides with Fares (java RichCoFlatMap)");
}
public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
// keyed, managed state
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
@Override
public void open(Configuration config) {
rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(new Tuple2(ride, fare));
} else {
rideState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(new Tuple2(ride, fare));
} else {
fareState.update(fare);
}
}
}
}
最佳答案
在这个特定的背景下training exercise on stateful enrichment ,每个rideId 值都有三个事件——TaxiRide 开始事件、TaxiRide 结束事件和TaxiFare。此练习的目标是将每个 TaxiRide 开始事件与具有相同rideId 的一个 TaxiFare 事件连接起来,或者换句话说,加入rideId 上的乘车流和票价流,同时知道两者只有一个。
此练习演示了 Flink 中键控状态的工作原理。键控状态实际上是一个分片键值存储。当我们有 ValueState
的项目时,如ValueState<TaxiRide> rideState
,Flink 将为每个不同的键值( rideId
)在其状态后端存储一条单独的记录。
每次flatMap1
和flatMap2
被调用时,上下文中隐式存在一个键(a rideId
),当我们调用 rideState.update(ride)
时或rideState.value()
我们不是访问单个变量,而是使用 rideId
设置和获取键值存储中的条目。作为 key 。
在本练习中,两个流均以 rideId
作为键控。 ,因此 rideState
中可能存在一个元素和 fareState
的一个元素对于每个不同的 rideId
。因此,所提供的解决方案是缓冲大量的乘车和票价,但每个rideId
只有一个。 (这已经足够了,因为该数据集中的游乐设施和票价完美匹配)。
所以,你问:
How is the EnrichmentFunction able to join the two streams aka. how does it know which fare to join with which ride?
答案是
It joins the fare having the same
rideId
.
您询问的这个特定练习展示了如何实现简单的丰富连接,以便理解键控状态和连接流的想法。但使用 Flink 肯定可以实现更复杂的连接。请参阅 joins using the DataStream API 上的文档, joins with Flink's Table API ,和joins with Flink SQL .
关于java - 如何在 apache flink 中加入两个流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54277910/
我似乎对 git 存储库有权限问题。 当我 pull 入一个不是我的 Linux 用户创建的目录时,我出现了这个错误。 fatal: Unable to create '/home/---/.git/
在 Git 中,您可以将给定目录克隆到给定目录: git clone ssh://gitolite@dev.bipper.com:3687/com/bipper/kids/portal 当我运行我们
目前,如果您在分支 V2 中并执行“git pull origin V3”,它会将 V3 merge 到 V2,甚至不会发出警告或提示。这个选项可以以某种方式被阻止吗?我在这里阅读了所有类似的问题,人
我刚开始使用 Oracle 的 Coherence 缓存,我注意到这一点:如果我在缓存中放入一个 ConcurrentHashMap 对象,当我检索它时,我可以看到它被转换为一个普通的 HashMap
看起来我缺少对 git pull 和 git commit 的基本理解,假设我在分支上工作,而它在我更新时被其他开发人员更新了在本地做我的工作。我应该在发出 git pull 之前提交更改,还是应该执
好的。所以我以为我已经舔过了……但现在…… 我有一个项目,其中包含一个来自 GitHub 的小型库作为子模块。在该 super 项目的原始版本中,子模块按预期工作。 但是,我只是克隆了 super 项
使用 Visual Studio Code 中的内置 Git,我看不到将指定的远程分支 pull 入当前分支的方法。我可以这样做吗? 示例:我正在分支 myBranch 上工作,更改已 merge 到
当我尝试提交或 pull 此错误时 Bus error (core dumped) 发生了! 当我用 gdb 调试它时,(gdb git,run commit -a,where) 结果是: mucul
我对默认 Rails Rake 任务的预期用途有点困惑,想咨询一下我是否应该使用 db:reset或编写自定义 Rake 任务。没什么聪明的,只是日常管理,而且我很可能会错过一个明显的文档,因为我是
所以我做了: git reset --hard #commithash # make a bunch of changes, fixes and so on. git add -A git commi
我已使用以下命令成功部署到 firebase 托管应用: firebase init firebase deploy 在这个阶段,我正在执行 git pull 以将 repo 下 pull 到暂存服务
当尝试在 Eclipse 的 git 存储库中 pull (团队|从上下文菜单中 pull )时,出现 Could not get advertised Ref for branch refs/hea
我是一名优秀的程序员,十分优秀!