gpt4 book ai didi

java - 在 Flink CEP 中并行处理一个流数据上的多个模式

转载 作者:太空宇宙 更新时间:2023-11-04 09:39:16 26 4
gpt4 key购买 nike

我有以下用例。

有一台机器正在向 Kafka 发送事件流,CEP 引擎 接收这些事件流,当流数据满足条件时会生成警告。

FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<Event>(kafkaInputTopic, new EventDeserializationSchema(), properties);
DataStream<Event> eventStream = env.addSource(kafkaSource);

事件 POJO 包含 id、名称、时间、ip。

机器将向 Kafka 发送大量数据,并且机器中有 35 个唯一的事件名称(如 name1、name2 ..... name35),我想检测每个事件名称组合的模式(如 name1 与 name2 共同出现、name1 与 name3 共同出现等)。我一共得到了1225种组合。

规则 POJO 包含 e1Name 和 e2Name。

List<Rule> ruleList -> It contains 1225 rules.

for (Rule rule : ruleList) {
Pattern<Event, ?> warningPattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {

@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE1Name())) {
return true;
}
return false;
}

}).followedBy("next").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE2Name())) {
return true;
}
return false;
}
}).within(Time.seconds(30));
PatternStream patternStream = CEP.pattern(eventStream, warningPattern);
}

这是在一个流数据上执行多个模式的正确方法吗?或者是否有任何优化的方法来实现这一点。通过上述方法,我们遇到了 PartitionNotFoundExceptionUnknownTaskExecutorException 以及内存问题。

最佳答案

在我看来,你不需要模式来实现你的目标。您可以定义一个到源的有状态映射函数,它将事件名称映射为对(最新的两个名称)。之后,将源窗口设置为 30 秒,并将简单的 WordCount 示例应用于源。

有状态 map 函数可以是这样的(仅接受事件名称,您需要根据您的输入 - 提取事件名称等进行更改):

public class TupleMap implements MapFunction<String, Tuple2<String, Integer>>{
Tuple2<String, String> latestTuple = new Tuple2<String, String>();

public Tuple2<String, Integer> map(String value) throws Exception {
this.latestTuple.f0 = this.latestTuple.f1;
this.latestTuple.f1 = value;
return new Tuple2<String, Integer>(this.latestTuple.f0 + this.latestTuple.f1, 1);
}
}

并且可以像这样获得事件名称对和出现计数作为元组的结果(也许写入kafka接收器?):

DataStream<Tuple2<String, Integer>> source = stream.map(new TupleMap());
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(0).timeWindow(Time.seconds(30)).sum(1);

关于java - 在 Flink CEP 中并行处理一个流数据上的多个模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56161122/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com