- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
基本需求
1、 用户在短时间内频繁登录失败,有程序恶意攻击的可能;
2、 同一用户(可以是不同IP)在2秒内连续两次登录失败,需要报警;
解决思路
1、 将用户的登录失败行为存入ListState,设定定时器2秒后触发,查看ListState中有几次失败登录;
2、 更加精确的检测,可以使用CEP库实现事件流的模式匹配;
pom文件配置如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!-- Table API 和 Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.10.1</version>
</dependency>
LoginEvent
private Long userId;
private String ip;
private String loginState;
private Long timestamp;
LoginFailWarning
private Long userId;
private Long firstFailTime;
private Long lastFailTime;
private String warningMsg;
代码:
package com.zqs.flink.project.loginfail_detect;
/**
* @remark 登陆失败监控
*/
import com.zqs.flink.project.loginfail_detect.beans.LoginFailWarning;
import com.zqs.flink.project.loginfail_detect.beans.LoginEvent;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
public class LoginFail {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1. 从文件中读取
URL resource = LoginFail.class.getResource("/LoginLog.csv");
DataStream<LoginEvent> loginEventStream= env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp()*1000L;
}
});
// 自定义处理函数检测连续登录失败事件
SingleOutputStreamOperator<LoginFailWarning> warningStream = loginEventStream
.keyBy(LoginEvent::getUserId)
.process(new LoginFailDetectWarning(2));
warningStream.print();
env.execute("login fail detect job");
}
// 实现自定义KeyedProcessFunction
public static class LoginFailDetectWarning0 extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>{
// 定义属性,最大连续登录失败次数
private Integer maxFailTimes;
public LoginFailDetectWarning0(Integer maxFailTimes) {
this.maxFailTimes = maxFailTimes;
}
// 定义状态: 保存2秒内所有登陆失败事件
ListState<LoginEvent> loginFailEventListState;
// 定义状态: 保存注册的定时器时间戳
ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
loginFailEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
// 判断当前登陆事件
if ( "fail".equals(value.getLoginState()) ){
// 1. 如果是失败事件, 添加到列表状态中
loginFailEventListState.add(value);
// 如果没有定时器,注册一个2秒之后的定时器
if(timerTsState.value() == null ){
Long ts = (value.getTimestamp() + 2) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timerTsState.update(ts);
}
} else {
// 2. 如果是登陆成功, 删除定时器,清空状态,重新开始
if( timerTsState.value() != null )
ctx.timerService().deleteEventTimeTimer(timerTsState.value());
loginFailEventListState.clear();
timerTsState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<LoginFailWarning> out) throws Exception {
// 定时器触发, 说明2秒内没有登陆成功来, 判断ListState中的失败个数
ArrayList<LoginEvent> loginFailEvents = Lists.newArrayList(loginFailEventListState.get());
Integer failTimes = loginFailEvents.size();
if( failTimes >= maxFailTimes ){
// 如果超出设定的最大失败次数,输出报警
out.collect( new LoginFailWarning(ctx.getCurrentKey(),
loginFailEvents.get(0).getTimestamp(),
loginFailEvents.get(failTimes -1).getTimestamp(),
"login fail in 2s for " + failTimes + " times"
));
}
// 清空状态
loginFailEventListState.clear();
timerTsState.clear();
}
}
// 实现自定义KeyedProcessFunction
public static class LoginFailDetectWarning extends KeyedProcessFunction<Long, LoginEvent, LoginFailWarning>{
// 定义属性, 最大连续登陆失败次数
private Integer maxFailTimes;
public LoginFailDetectWarning(Integer maxFailTimes) {
this.maxFailTimes = maxFailTimes;
}
// 定义状态: 保存2秒内所有的登陆失败事件
ListState<LoginEvent> loginEventListState;
@Override
public void open(Configuration parameters) throws Exception {
loginEventListState = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("login-fail-list", LoginEvent.class));
}
@Override
public void processElement(LoginEvent value, Context ctx, Collector<LoginFailWarning> out) throws Exception {
// 判断当前事件登陆状态
if ( "fail".equals(value.getLoginState()) ){
// 1. 如果是登陆失败, 获取状态中之前的登陆失败事件, 继续判断是否已有失败事件
Iterator<LoginEvent> iterator = loginEventListState.get().iterator();
if( iterator.hasNext() ){
// 1.1 如果已经有登陆失败事件, 继续判断是否已有失败事件
// 获取已有的登陆失败事件
LoginEvent firstFailEvent = iterator.next();
if (value.getTimestamp() - firstFailEvent.getTimestamp() <= 2){
// 1.1.1 如果2秒之内, 输出报警
out.collect( new LoginFailWarning(value.getUserId(), firstFailEvent.getTimestamp(), value.getTimestamp(), "login fail 2 times in 2s"));
}
// 不管报不报警, 这次都已处理完毕,直接更新状态
loginEventListState.clear();
loginEventListState.add(value);
} else {
// 1.2 如果没有登陆失败,直接将当前事件存入ListState
loginEventListState.add(value);
}
} else {
// 2. 如果是登陆成功,直接清空状态
loginEventListState.clear();
}
}
}
}
代码:
package com.zqs.flink.project.loginfail_detect;
/**
* @remark 登陆失败监控 - CEP
*/
import com.zqs.flink.project.loginfail_detect.beans.LoginEvent;
import com.zqs.flink.project.loginfail_detect.beans.LoginFailWarning;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import sun.rmi.runtime.Log;
import java.net.URL;
import java.util.List;
import java.util.Map;
public class LoginFailWithCep {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 1. 从文件中读取数据
URL resource = LoginFailWithCep.class.getResource("/LoginLog.csv");
DataStream<LoginEvent> loginEventStream = env.readTextFile(resource.getPath())
.map(line -> {
String[] fields = line.split(",");
return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
@Override
public long extractTimestamp(LoginEvent element) {
return element.getTimestamp() * 1000L;
}
});
// 1. 定义一个匹配模式
// firstFail -> secondFail, within 2s
Pattern<LoginEvent, LoginEvent> loginFailPattern0 = Pattern
.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
})
.next("secondFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
})
.next("thirdFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
})
.within(Time.seconds(3));
Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern
.<LoginEvent>begin("failEvents").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getLoginState());
}
}).times(3).consecutive()
.within(Time.seconds(5));
// 2. 将匹配模式应用到数据流上,得到一个pattern stream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
// 3. 检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
warningStream.print();
env.execute("login fail detect with cep job");
}
// 实现自定义的PatternSelectFunction
public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
@Override
public LoginFailWarning select(Map<String, List<LoginEvent>> pattern) throws Exception {
LoginEvent firstFailEvent = pattern.get("failEvents").get(0);
LoginEvent lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);
return new LoginFailWarning(firstFailEvent.getUserId(), firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp(), "login fail " + pattern.get("failEvents").size() + " times");
}
}
}
测试记录:
1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势。尽管 SQL
1.概述 转载:Flink 源码阅读笔记(6)- 计算资源管理 在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。 2.Task
1.概述 转载:Flink jvm参数配置GC日志 生产环境上,或者其他要测试 GC 问题的环境上,一定会配置上打印GC日志的参数,便于分析 GC 相关的问题。 但是可能很多人配置的都不够“完美”,要
1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl
1.概述 转载:Flink SQL代码生成与UDF重复调用的优化 2. 代码生成简介 代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成
1.概述 转载:面向流批一体的 Flink Runtime 新进展 首先是关于调度部分的性能优化。Flink 由于存在 all to all 的连接关系,两个并发为 n 的算子之间会有 n² 条边,这
在Fink源码中,有flink-stream-java和flink-stream-scala模块。 flink streaming 为什么需要两个模块? https://github.com/apac
我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
我开发了一个 Flink 作业并使用 Apache Flink 仪表板提交了我的作业。根据我的理解,当我提交作业时,我的 jar 应该在 Flink 服务器上可用。我试图找出我的 jar 的路径,但无
1.概述 转载:Flink 源码阅读笔记(4)- RPC 相关文章: 【Flink】Flink 源码之RPC调用 Flink】FLink 通讯组件 RPC 作为一个分布式系统,Flink 内部不同组件
1.概述 转载并且补充: flink keyby 分布不均匀问题 我使用随机数random.nextint(8)作为key,生成keyedstream之后,直接sink到存储中,但是sink算子只有四
1.概述 转载:Flink Sort-Shuffle写流程简析 转载并且补充。 2.配置 taskmanager.network.sort-shuffle.min-parallelism 核心配置。设
1.概述 转载:Flink源码分析——批处理模式Map端数据聚合 在flink的批处理模式下,数据的计算也有着map/reduce两端的计算模型,这一点和MR、spark计算框架是类似的。在数据进行分
1.概述 转载:Flink on yarn 远程调试 大家好,我是 JasonLee。 前几天有小伙伴问我,我写的 Flink 代码是提交到 yarn 上去运行的,那我怎么能远程调试代码呢?在本地调试
当我使用 flink 事件时间窗口时,窗口就是不触发。请问如何解决,有什么debug的方法吗? 最佳答案 由于您使用的是事件时间窗口,所以很可能是水印问题。该窗口仅在水印取得进展时输出。事件时间没有提
我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。所以 2 个流程看起来像 Source1 -> operator1 -> Sink1 Source2 -> operator2 -> S
我们正在尝试构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。通过阅读文档,在我看来,Flink 广播状态很适合这种情况。 作为实验,我构建了一个简化版本:假设我有一
我有一个 Flink Streaming 作业,它失败了,我得到如下日志。谁能告诉我如何解决这个问题?有时运行一天就失效,有时运行几个小时就失效。 09:30:25 948 INFO (org.ap
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafk
我是一名优秀的程序员,十分优秀!