gpt4 book ai didi

performance - Spark 最近 30 天过滤器,提高性能的最佳方法

转载 作者:可可西里 更新时间:2023-11-01 14:48:49 26 4
gpt4 key购买 nike

我有一个记录的 RDD,转换为 DataFrame,我想按天时间戳过滤并计算最近 30 天的统计数据,按列过滤并计算结果。

Spark 应用程序在进入 for 循环之前非常快,所以我想知道这是否是一种反模式方法,我怎样才能获得良好的性能,我应该使用 spark 笛卡尔坐标吗?

//FILTER PROJECT RECORDS
val clientRecordsDF = recordsDF.filter($"rowkey".contains(""+client_id))
client_records_total = clientRecordsDF.count().toLong

这是clientRecordsDF的内容

root
|-- rowkey: string (nullable = true) //CLIENT_ID-RECORD_ID
|-- record_type: string (nullable = true)
|-- device: string (nullable = true)
|-- timestamp: long (nullable = false) // MILLISECOND
|-- datestring: string (nullable = true) // yyyyMMdd

[1-575e7f80673a0,login,desktop,1465810816424,20160613]
[1-575e95fc34568,login,desktop,1465816572216,20160613]
[1-575ef88324eb7,registration,desktop,1465841795153,20160613]
[1-575efe444d2be,registration,desktop,1465843268317,20160613]
[1-575e6b6f46e26,login,desktop,1465805679292,20160613]
[1-575e960ee340f,login,desktop,1465816590932,20160613]
[1-575f1128670e7,action,mobile-phone,1465848104423,20160613]
[1-575c9a01b67fb,registration,mobile-phone,1465686529750,20160612]
[1-575dcfbb109d2,registration,mobile-phone,1465765819069,20160612]
[1-575dcbcb9021c,registration,desktop,1465764811593,20160612]
...


the for loop with bad performances

var dayCounter = 0;
for( dayCounter <- 1 to 30){
//LAST 30 DAYS

// CREATE DAY TIMESTAMP
var cal = Calendar.getInstance(gmt);

cal.add(Calendar.DATE, -dayCounter);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
val calTime=cal.getTime()
val dayTime = cal.getTimeInMillis()

cal.set(Calendar.HOUR_OF_DAY, 23);
cal.set(Calendar.MINUTE, 59);
cal.set(Calendar.SECOND, 59);
cal.set(Calendar.MILLISECOND, 999);
val dayTimeEnd = cal.getTimeInMillis()

//FILTER PROJECT RECORDS
val dailyClientRecordsDF = clientRecordsDF.filter(
$"timestamp" >= dayTime && $"timestamp" <= dayTimeEnd
)
val daily_client_records = dailyClientRecordsDF.count().toLong

println("dayCounter "+dayCounter+" records = "+daily_project_records);

// perform other filter on dailyClientRecordsDF
// save daily statistics to hbase

}
}

最佳答案

几乎在所有情况下都应避免创建 UDF。这样做会阻止 Catalyst Optimizer 正确处理查询.

相反,使用内置的 SQL 函数:

(
spark.read.table("table_1")
.join(
spark.read.table("table_2"),
"user_id"
)
.where("p_eventdate > current_date() - 30")
)

关于performance - Spark 最近 30 天过滤器,提高性能的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37826054/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com