gpt4 book ai didi

apache-spark - 从事件流中查找事件的子序列

转载 作者:行者123 更新时间:2023-12-03 21:34:30 26 4
gpt4 key购买 nike

我在下面给出了我的问题的微型版本

我有 2 个不同的传感器以流的形式发送 1/0 值。我能够使用 Kafka 使用流并将其带入 spark 进行处理。请注意我在下面给出的示例流。

时间 --------------> 1 2 3 4 5 6 7 8 9 10

传感器名称--> A A B B B B A B 一个

传感器值 ---> 1 0 1 0 1 0 0 1 1 0

我想确定此流中出现的子序列模式。例如,如果 A =0 并且流中的下一个值(基于时间)是 B =1,那么我想推送警报。在上面的示例中,我突出显示了 2 个地方——我想发出警报的地方。一般来说它会像

“If a set of sensor-event combination happens within a time interval, raise an alert”.



我是 Spark 新手,不知道 Scala。我目前正在使用 python 进行编码。

我的实际问题包含更多传感器,每个传感器可以有不同的值组合。意思是我的子序列和事件流

我尝试了几个选项但没有成功
  • 窗口函数 - 可用于移动平均累积总和
    等不适用于此用例
  • 将 spark Dataframes/RDDs 带到本地 python 结构,如列表
    和 Pandas 数据帧并进行子测序——这需要很多
    一些迭代后,随机播放和触发事件流排队
  • UpdateStatewithKey - 尝试了几种方法,但无法理解
    完全如何工作以及这是否适用于此用途
    案件。
  • 最佳答案

    任何寻找此问题解决方案的人都可以使用我的解决方案:
    1- 为了让它们保持连接,您需要使用 collect_list 收集事件。
    2- 最好在 collect_list 上对您的事件进行排序,但要小心,因为它按第一列排列数据,因此将 DateTime 放在该列中很重要。
    3- 例如,我从 collect_list 中删除了 DateTime。
    4- 最后,您应该联系所有元素以使用字符串函数(如包含)来探索它以找到您的子序列。

    .agg(expr("array_join(TRANSFORM(array_sort(collect_list((Time , Sensor Value))), a -> a.Time ),'')")as "MySequence")
    在此 agg 函数之后,您可以使用任何正则表达式或字符串函数来检测您的模式。
    查看此链接以获取有关 collect_list 的更多信息:
    collect list
    检查此链接以获取有关对 collect_list 进行排序的更多信息:
    sort a collect list

    关于apache-spark - 从事件流中查找事件的子序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37922199/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com