- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor
Apache Flink 1.12 Documentation: Generating Watermarks
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author ddkk.com 弟弟快看,程序员编程资料站
* Desc
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermakerDemo01_Develop {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//模拟实时订单数据(数据有延迟和乱序)
DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
//模拟数据延迟和乱序!
long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
ctx.collect(new Order(orderId, userId, money, eventTime));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
flag = false;
}
});
//3.Transformation
//-告诉Flink要基于事件时间来计算!
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
//-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间
@Override
public long extractTimestamp(Order element) {
return element.eventTime;
//指定事件时间是哪一列,Flink底层会自动计算:
//Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
}
});*/
DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
DataStream<Order> result = watermakerDS
.keyBy(Order::getUserId)
//.timeWindow(Time.seconds(5), Time.seconds(5))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");
//4.Sink
result.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Author ddkk.com 弟弟快看,程序员编程资料站
* Desc
* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
*/
public class WatermakerDemo02_Check {
public static void main(String[] args) throws Exception {
FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//模拟实时订单数据(数据有延迟和乱序)
DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
private boolean flag = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (flag) {
String orderId = UUID.randomUUID().toString();
int userId = random.nextInt(3);
int money = random.nextInt(100);
//模拟数据延迟和乱序!
long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
System.out.println("发送的数据为: "+userId + " : " + df.format(eventTime));
ctx.collect(new Order(orderId, userId, money, eventTime));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
flag = false;
}
});
//3.Transformation
/*DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);*/
//开发中直接使用上面的即可
//学习测试时可以自己实现
DataStream<Order> watermakerDS = orderDS
.assignTimestampsAndWatermarks(
new WatermarkStrategy<Order>() {
@Override
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Order>() {
private int userId = 0;
private long eventTime = 0L;
private final long outOfOrdernessMillis = 3000;
private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
@Override
public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
userId = event.userId;
eventTime = event.eventTime;
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间
Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
output.emitWatermark(watermark);
}
};
}
}.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
/* DataStream<Order> result = watermakerDS
.keyBy(Order::getUserId)
//.timeWindow(Time.seconds(5), Time.seconds(5))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");*/
//开发中使用上面的代码进行业务计算即可
//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间
DataStream<String> result = watermakerDS
.keyBy(Order::getUserId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//把apply中的函数应用在窗口中的数据上
//WindowFunction<IN, OUT, KEY, W extends Window>
.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
@Override
public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
//准备一个集合用来存放属于该窗口的数据的事件时间
List<String> eventTimeList = new ArrayList<>();
for (Order order : input) {
Long eventTime = order.eventTime;
eventTimeList.add(df.format(eventTime));
}
String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",
key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
out.collect(outStr);
}
});
//4.Sink
result.print();
//5.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
import com.ouyang.bean.SensorReading
import com.ouyang.customfunction.MyMapToSensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @ Date:2020/12/4
* @ Author:yangshibiao
* @ Desc:Flink中的Watermark
*/
object WatermarkDemo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
env.getConfig.setAutoWatermarkInterval(5000)
val sensorStream: DataStream[SensorReading] = env
.readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
.map(new MyMapToSensorReading)
// 1、引入Watermark(使用已有的类)
// 1.1、给一个没有乱序,时间为升序的流设置一个EventTime
val ascendingStream: DataStream[SensorReading] = sensorStream.assignAscendingTimestamps(_.timestamp)
// 1.2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
/**
* 知识点:
* 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型
* 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间)
* 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回
* 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便
*/
val watermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp * 1000
}
})
// 2、使用 TimestampAssigner 引入 Watermark
// 2.1、Assigner with periodic watermarks(周期性引入watermark)
/**
* 知识点:
* 1、系统会周期性的将watermark插入到流中,默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置,单位为毫秒
* 2、产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,如果大于流中最大watermark就插入,小于就不插入
* 3、如下,可以自定义一个周期性的时间戳抽取(需要实现 AssignerWithPeriodicWatermarks 接口)
*/
env.getConfig.setAutoWatermarkInterval(5000)
val periodicWatermarkStream: DataStream[SensorReading] = sensorStream.assignTimestampsAndWatermarks(new MyPeriodicAssigner(10))
env.execute("WatermarkDemo")
}
}
/**
* 自定义一个周期生成watermark的类
* @param bound watermark的延时时间(毫秒)
*/
class MyPeriodicAssigner(bound: Long) extends AssignerWithPeriodicWatermarks[SensorReading] {
// 当前为止的最大时间戳(毫秒)
var maxTs: Long = Long.MinValue
/**
* 获取当前的watermark(默认200毫秒获取一次,可以通过 env.getConfig.setAutoWatermarkInterval(5000) 来设置)
* @return 当前watermark,当前最大时间戳 - 延时时间
*/
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
/**
* 指定eventTime对应的字段(流中每条数据都会调用一次此方法)
* @param element 流中的每条数据
* @param previousElementTimestamp 无
* @return 当前流的eventTime(单位:毫秒)
*/
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
// 每条数据都获取其中的时间戳,跟最大时间戳取大,并重新赋值给最大时间戳
maxTs = maxTs.max(element.timestamp * 1000)
element.timestamp * 1000
}
}
这个问题在这里已经有了答案: Oracle: merging two different queries into one, LIKE & IN (1 个回答) 8年前关闭。 我有以下代码: case
我查阅过此页面:http://dev.mysql.com/doc/refman/5.1/en/case.html以及这个,但无法获得一个简单的程序来工作...... 更新:为了明确我想要做什么:我想从
有什么办法可以优化下面的查询吗? SELECT DATE_FORMAT(a.duedate,'%d-%b-%y') AS dte, duedate, SUM(CASE WHEN (typeofnoti
我进退两难,以下 SQL 查询的结果是什么以及它是如何工作的: SELECT ... CASE WHEN (a.FIELD=1 AND b.FIELD=2) THEN 1 WHEN
问题:输入年,月,打印对应年月的日历。 示例: 问题分析: 1,首先1970年是Unix系统诞生的时间,1970年成为Unix的元年,1970年1月1号是星期四,现在大多的手机的日历功能只能显
**摘要:**介绍了Angular中依赖注入是如何查找依赖,如何配置提供商,如何用限定和过滤作用的装饰器拿到想要的实例,进一步通过N个案例分析如何结合依赖注入的知识点来解决开发编程中会遇到的问题。 本
我想拥有自动伴侣类apply case 类的构造函数来为我执行隐式转换,但无法弄清楚如何这样做。我到处搜索,我能找到的最接近的答案是 this问题(我将解释为什么它不是我在下面寻找的)。 我有一个看起
您好,我已经浏览了“多列案例”问题,但没有看到与此相同的内容,所以我想我应该问一下。 基本上我有两个我想要连接的表(都是子查询的结果)。它们具有相同的列名称。如果我加入他们的 ID 和 SELECT
我发现了一些类型推断的非直觉行为。因此,语义等效代码的工作方式不同,具体取决于编译器推断出的有关函数返回类型的信息。当您在最小单元测试中重现此案例时,或多或少会清楚发生了什么。但我担心在编写框架代码时
CREATE TABLE test ( sts_id int , [status1] int , [status2] int , [status3] int , [status4] int ) INS
我有以下声明: SELECT Dag AS Dag, CASE Jaar WHEN 2013 THEN Levering END AS '2013', CASE
我想做的是为所有高于平均时间、平均时间和低于平均时间的游乐设施获取平均tip_portion。所以返回3行。当我运行它时,它显示: ERROR: missing FROM-clause entry
我正在尝试设置一个包含以下字段的报告: 非常需要报告来显示日期、该日期内的总记录(因此我按日期分组),然后按小时计算 12 小时工作日(从上午 8 点到晚上 8 点)我需要计算记录在这些时间内出现的时
我有这个查询 SELECT users.name FROM users LEFT JOIN weapon_stats ON users.id = weapon_stats.zp_id WHERE we
我正在尝试按收视率等级获取不同视频的计数。我有下表: vid_id views 1 6 1 10 1 900 2 850 2 125000
假设我有一个如下所示的 SQL 语句: select supplier, case when platform in (5,6) then 'mobile' when p
我有一个表测试 TestNumber (int primary key) InactiveBitwise (int) 我执行以下命令: UPDATE tests SET CASE WH
我有一个像这样的表(name=expense): id amount date 1 -1687 2014-01-02 00:00:00.0 2 11000 2014-01-02 0
我有一个 multimap 定义 typedef std::pair au_pair; //vertices typedef std::pair acq_pair; //ch qlty specifi
我有一个有点像枚举的类,它的每个实例都有一个唯一的 int 值,该值从 0 开始并在每个新实例时递增。 class MyEnumLikeClass { static int NextId =
我是一名优秀的程序员,十分优秀!