gpt4 book ai didi

apache-spark - 流作业中窗口函数的性能不佳

转载 作者:行者123 更新时间:2023-12-04 04:12:34 26 4
gpt4 key购买 nike

我使用 Spark 2.0.2、Kafka 0.10.1 和 spark-streaming-kafka-0-8 集成。我想执行以下操作:

我从 NetFlow 连接中提取流作业中的特征,然后将记录应用于 k-means 模型。一些特征是直接从记录中计算的简单特征。但我也有更复杂的功能,这些功能取决于之前指定时间窗口的记录。他们计算在最后一秒有多少连接与当前连接到同一主机或服务。为此,我决定使用 SQL 窗口函数。

所以我建立窗口规范:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

还有一个函数被调用以在每个批处理上提取此特征:

def extractTrafficFeatures(dataset: Dataset[Row]) = {
dataset
.withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).over(serviceCountWindow))
}

并按如下方式使用该函数

stream.map(...).map(...).foreachRDD { rdd =>
val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_))
...
}

问题是这样的性能很差。对于低于每秒 100 条记录的平均输入速率,一个批处理需要 1 到 3 秒。我猜它来自分区,它会产生很多洗牌?

我尝试使用 RDD API 和 countByValueAndWindow()。这似乎要快得多,但使用 DataFrame API,代码看起来更漂亮、更清晰。

有没有更好的方法来计算流式数据上的这些特征?还是我在这里做错了什么?

最佳答案

这里的性能相对较低是可以预料的。您的代码必须对数据进行两次洗牌和排序,一次用于:

Window
.partitionBy("plainrecord.ip_dst")
.orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

一次用于:

Window
.partitionBy("service")
.orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

这将对运行时产生巨大影响,如果这些是硬性要求,您将无法做得更好。

关于apache-spark - 流作业中窗口函数的性能不佳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41744641/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com