gpt4 book ai didi

algorithm - 提高 Apache Spark 中重叠观察的置信度

转载 作者:塔克拉玛干 更新时间:2023-11-03 06:14:36 24 4
gpt4 key购买 nike

我是 scala/spark 的新手,如果我的问题很初级,请原谅我,但我到处搜索都找不到答案。

问题

我正在尝试提高一堆网络路由器观察结果的置信度分数(在不同网络连接点观察可能的路由器类型)。

我有一个类型 NetblockObservation 将网络上看到的设备类型与关联的网络 block 和置信度结合起来。置信度是我们准确识别所见设备的置信度。

case class NetblockObservation(
device_type: String
ip_start: Long,
ip_end: Long,
confidence: Double
)

如果置信度高于某个阈值 thresh,那么我希望该观察结果包含在返回的数据集中。如果低于 thresh,则不应低于。

此外,如果我有两个具有相同 device_type 的观察结果,并且其中一个包含另一个,则被容器的置信度应该增加容器的置信度。

示例

假设我有 3 个 Netblock 观察

// 0.0.0.0/28
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 15, confidence_score: .4)
// 0.0.0.0/29
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 7, confidence_score: .4)
// 0.0.0.0/30
NetblockObservation(device_type: "x", ip_start: 0, ip_end: 3, confidence_score: .4)

如果置信度阈值为 1,我希望得到单个输出 NetblockObservation(device_type: "x", ip_start: 0, ip_end: 4, confidence_score: 1.2)

说明:如果 NetblockObservation 包含且具有相同的 device_type,我可以将其置信度得分加在一起

我被允许将 0.0.0.0/29 的置信度分数添加到 0.0.0.0/30 的置信度中,因为它包含在其中。

我不允许将 0.0.0.0/30 的置信度分数添加到 0.0.0.0/29 因为 0.0.0.0/29 不包含在 0.0.0.0/30 中。

我(可怜的)尝试

失败原因:太慢/从未完成

我试图在学习 scala/spark 的同时实现这一点,所以我不确定是这个想法还是错误的实现。我认为它最终会起作用,但一个小时后,它还没有在大小为 300,000(与生产规模相比较小)的数据集上完成,所以我放弃了。

想法是找到最大的网络 block 并将数据分成包含的网络 block 和不包含的网络 block 。未包含的网络 block 递归传递回同一函数。如果最大的网络 block 的 confidence_score 为 1,则忽略整个包含的数据集,并将最大的添加到返回数据集中。如果 confidence_score 小于 1,则其 confidence_score 将添加到包含的数据集中的所有内容,并且该组将递归传递回同一函数。最终,您应该只剩下 confidence_score 大于 1 的数据。该算法还存在未考虑 device_type 的问题。

def handleDataset(largestInNetData: Option[NetblockObservation], netData: RDD[NetblockObservation]): RDD[NetblockObservation] = {
if (netData.isEmpty) spark.sparkContext.emptyRDD else largestInNetData match {
case Some(largest) =>
val grouped = netData.groupBy(item =>
if (item.ip_start >= largest.ip_start && item.ip_end <= largest.ip_end) largestInNetData
else None)

def lookup(k: Option[NetblockObservation]) = grouped.filter(_._1 == k).flatMap(_._2)

val nos = handleDataset(None, lookup(None))
// Threshold is assumed to be 1
val next = if (largest.confidence_score >= 1) spark.sparkContext.parallelize(Seq(largest)) else
handleDataset(None, lookup(largestInNetData)
.filter(x => x != largest)
.map(x => x.copy(confidence_score = x.confidence_score + largest.confidence_score)))
nos ++ next
case None =>
val largest = netData.reduce((a: NetblockObservation, b: NetblockObservation) => if ((a.ip_end - a.ip_start) > (b.ip_end - b.ip_start)) a else b)
handleDataset(Option(largest), netData)
}
}

最佳答案

这是一段相当复杂的代码,所以这里是一个通用算法,希望对您有所帮助:

  • 暂时忘掉 Spark 并编写一个 Scala 函数,可能在 NetblockObservation 的伴随对象中,它获取它们的集合并返回包含的该集合的一个子集。您应该对该函数进行单元测试,这又是纯 Scala。
  • 现在转到 Spark。在你的 RDD[NetblockObservation] 上做一个 groupBydevice_type 为键产生一个 StringIterable[NetblockObservation].
  • 过滤掉 map 中所有大小为 1 的条目并且置信度低于thresh
  • 对于剩余的条目,将第一步中的函数应用到具有 mapValuesNetblockObservation 集合。
  • 执行 reduceByKey 或类似操作,简单地将包含值的 confidence_score 相加。
  • 享用清凉饮料。

关于algorithm - 提高 Apache Spark 中重叠观察的置信度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43003433/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com