gpt4 book ai didi

scala - 如何比较两个数据集?

转载 作者:行者123 更新时间:2023-12-04 01:06:47 32 4
gpt4 key购买 nike

我正在运行一个 spark 应用程序,它从几个配置单元表(IP 地址)读取数据,并将数据集中的每个元素(IP 地址)与其他数据集中的所有其他元素(IP 地址)进行比较。最终结果将是这样的:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------

为了进行比较,我正在转换 dataframes产生于 hiveContext.sql("query")声明成 Fastutil对象。像这样:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

然后,我正在使用 iterator迭代每个集合并使用 FileWriter 将行写入文件.
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}

我正在使用 --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g 运行应用程序

该过程总共运行约 18-19 小时,每天对约 4-5 百万条记录进行一对一比较。

但是,当我检查 Application Master UI 时,我注意到在初始转换 dataframes 后没有任何事件发生。至 fastutil collection objects完成(这在作业启动后只需要几分钟)。我看到了 countcollect代码中使用的语句生成新作业,直到转换完成。之后,在比较运行时不会启动新作业。
  • 这意味着什么?这是否意味着分布式处理是
    根本没有发生?
  • 我知道集合对象不被视为 RDD,可以
    这就是原因吗?
  • spark如何在不使用资源的情况下执行我的程序
    分配?

  • 任何帮助将不胜感激,谢谢!

    最佳答案

    行后:

    val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

    特别是上一行的那部分:
    df.map(r => r(0).toString).collect()

    其中 collect最重要的是要注意,没有在 dfBuffer 上执行 Spark 作业。 (这是一个常规的本地 JVM 数据结构)。

    Does it mean that the distributed processing is not happening at all?



    正确的。 collect将所有数据放在驱动程序运行的单个 JVM 上(这正是您不应该这样做的原因,除非……您知道自己在做什么以及它可能导致什么问题)。

    我认为以上回答了所有其他问题。

    比较两个数据集(以 Spark 和分布式方式)的问题的可能解决方案是 join具有引用数据集和 count 的数据集比较记录数是否没有变化。

    关于scala - 如何比较两个数据集?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49143277/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com