- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
当特征数量或者模型数量很多的时候,使用 PySpark 去计算相关风控指标会节省很多的时间。网上关于使用 PySpark 计算相关风控指标的资料较少,尤其是PSI计算不管是国内还是国外相关的代码都没有正确的,这里抛砖引玉,写了三个风控常用的指标AUC,KS和PSI相关的计算方法,供参考.
AUC的相关概念网上已经有很多的很好的文章,这里不在赘述,AUC使用的到的计算公式如下:
其中 M 为负类样本的数目, N 为正类样本的数目 。
使用 PySpark 计算代码如下:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'
auc_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
.select(true_y_col, pred_y_col, date_col, 'model_name')\
.withColumn('totalbad', F.sum(F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
.withColumn('totalgood', F.sum(1-F.col(true_y_col)).over(Window.patitonBy(date_col, 'model_name').orderBy(F.lit(1))))\
.withColumn('rnk2', F.row_number().over(Window.partitionBy(date_col, 'model_name').orderBy(F.col(pred_y_col).asc())))\
.filter(F.col(true_y_col)==1)\
.groupBy(date_col, 'model_name')\
.agg(((F.sum(F.col('rnk2'))-0.5*(F.max(F.col('totalbad')))*(1+F.max(F.col('totalbad'))))/(F.max(F.col('totalbad'))*F.max(F.col('totalgood')))).alias('AUC'))\
.orderBy('model_name', date_col)
KS统计量是基于经验累积分布函数(Empirical Cumulative Distribution Function,ECDF) 建立的,一般定义为:
即为 TPR 与 FPR 差值绝对值的最大值.
KS计算方法有很多种,这里使用的是分箱法分别计算 TPR 与 FPR ,然后得到KS。 使用 PySpark 计算代码如下:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
true_y_col = 'y'
pred_y_col = 'pred_y'
date_col = 'day'
nBins = 10
ks_df = df.filter(F.col(true_y_col)>=0).filter(F.col(pred_y_col)>=0)\
.select(true_y_col, pred_y_col, date_col, 'model_name')\
.withColumn('Bin', F.ntile(nBins).over(Window.partitionBy(date_col, 'model_name').orderBy(pred_y_col)))\
.groupBy(date_col, 'model_name', 'Bin').agg(F.sum(true_y_col).alias('N_1'), F.sum(1-F.col(true_y_col)).alias('N-0'))\
.withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name')))\
.withColumn('SUM_1', F.sum('N_1').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy(date_col, 'model_name').orderBy('Bin')))\
.withColumn('KSn', F.expr('round(abs(SUM_1/ALL_1-SUM_0/ALL_0),6)'))\
.withColumn('KS', F.round(F.max('KSn').over(Window.partitionBy(date_col, 'model_name')),6))
ks_df = ks_df.select(date_col, 'model_name', 'KS').filter(col('KS').isNotNull()).dropDuplicates()
群体稳定性指标(Population Stability Index,PSI)是风控场景常用的 验证样本 在各分数段的分布与 建模样本 分布的稳定性。在建模中,常用来 筛选特征变量、评估模型稳定性 .
计算公式如下:
其中 \(A_i\) 代表的是 第i个分箱中实际分布(actual)样本占比 ,同理 \(E_i\) 代表的是 第i个分箱中预期分布(excepted)样本占比 。
使用 PySpark 计算代码如下:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import when
date_col = 'day'
nBins = 10
feature_list = ['fea_1', 'fea_2', 'fea_3']
df = df.withColumn('flag', when(F.col(date_col) == 'actual_date', 0).when(F.col(date_col) == 'excepted_date', 1).otherwise(None)
quantitles = df.filter(F.col('flag') == 0)\
.approxQuantile(feature_list, [i/nBins for i in range(1, nBins)], 0.001) # 基准样本分箱
quantitles_dict = {col: quantitles[idx] for idx, col in enumerate(feature_list)}
f_quantitles_dict = F.create_map([F.lit(x) if isinstance(x, str) else F.array(*[F.lit(xx) for xx in x]) for i in quantitles_dict.items() for x in i])
unpivotExpr = "stack(3, 'fea_1', fea_1, 'fea_2', fea_2, 'fea_3', fea_3)"
psi_df = df.filter(F.col('flag').isNotNull()).select('flag', F.expr(unpivotExpr))\
.withColumn('Bin', when(F.col('value').isNull(), 'Missing').otherwise(
when(F.col('value') < f_quantitles_dict[F.col('varname')][0], 'bin_0')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][1], 'bin_1')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][2], 'bin_2')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][3], 'bin_3')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][4], 'bin_4')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][5], 'bin_5')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][6], 'bin_6')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][7], 'bin_7')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_8')
.when(F.col('value') < f_quantitles_dict[F.col('varname')][8], 'bin_9')))\
.groupBy('varname', 'Bin').agg(F.sum('flag').alias('N_1'), F.sum(1-F.col('flag')).alias('N_0'))\
.withColumn('ALL_1', F.sum('N_1').over(Window.partitionBy('varname')))\
.withColumn('ALL_0', F.sum('N_0').over(Window.partitionBy('varname')))\
.withColumn('actual', F.expr('round(N_0/ALL_0, 6)'))\
.withColumn('excepted', F.expr('round(N_1/ALL_1, 6)'))\
.withColumn('PSIn', F.expr('round((actual-excepted)*ln(actual/excepted), 6'))\
.withColumn('PSI', F.round(F.sum('PSIn').over(Window.partitionBy('varname')), 6))
最后此篇关于使用PySpark计算AUC,KS与PSI的文章就讲到这里了,如果你想了解更多关于使用PySpark计算AUC,KS与PSI的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
您能否将 scipy.stats 中的 kstest 用于非标准分布函数(即改变 Students t 的 DOF,或改变 Cauchy 的 gamma)?我的最终目标是为我的分布拟合找到最大 p 值
所以我有一些股票价格数据,我想测试价格是否遵循对数正态分布。我的代码如下: import scipy.stats as stats print(stats.kstest(df['DJIA'], "lo
如何在 R 中将 465456.6789 等数字格式化为漂亮的 465,4K?其他例子 13567.566 到 13,5K 3567.5 到 3,5K 等等。一般来说,我想要类似的东西 roundup
创建新数据库时,我必须设置排序规则类型或设置其默认值....很好。 但实际上我需要知道 Kanatype Sensitive(KS) 和宽度敏感是什么意思,我知道例如区分大小写意味着字母对大写和小写敏
我正在尝试使用 scipy 中的 ks_2samp 函数运行 Kolmogorov-Smirnoff 测试,以确定数据的直方图是否来自同一分布。但有时返回的 p 值似乎不太正确... 例如这个直方图:
我在将自签名客户端证书从 Java keystore 导出到 pem 文件时遇到问题。我想让我的 C++ SSL 服务器程序验证这个自签名证书。如果我通过 openssl 创建自签名证书,它就可以工作
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 9 年前。 Improve t
我尝试使用代码安装包 ks install.packages("ks-package") library("MASS") library("ks") 但出现下一个错误: Warning message
我想在 R 中为以下示例数据帧“数据”执行两个样本 Kolmogorov-Smirnov (KS) 测试: Protein1 Protein2 Protein3 Protei
我有一个发行版,例如: d #[1] 4 22 15 5 9 5 11 15 21 14 14 23 6 9 17 2 7 10 4 或者,向量 d在 dput格式。 d <- c(
我想比较具有不同数据量和不同起点/终点的两个数据集。我想使用 KS.test,因为我在我编写的 C 程序中做了类似的事情(通过 GSL 直方图 -> GSL cdf -> 自己编写的 KS 测试比较数
我正在使用 KS 检验比较两个不同的经验累积分布函数,我想提取检验统计量最大值所在的位置(在 ECDF 中)。 问题:使用 R,是否有一种方便的方法来提取它,也许是从 ks.test 函数或其他地方?
我的简单问题是:如何在两个数据帧之间逐列执行 ks.test? 例如。我们有两个数据框: D1 <- data.frame(D$Ag, D$Al, D$As, D$Ba, D$Be, D$Ca, D$
我有一个 key.ks 文件需要打开。有人可以建议如何在 Windows 中打开此文件。我可以使用 Keytool 命令吗? 最佳答案 是的,你可以,只要它是 Java Keytool 可以理解的格式
我试图编写一个简单的 bash 脚本来打印一堆随机字符: for i in {1..100000} do echo -n $(printf \\$(printf '%03o' $(( ( RA
对于 ASIO 和 Windows WDM-KS 主机 API,PortAudio 显示 deviceCount 为 0 和 defaultOutputDevice 为 -1。我确实成功地构建了 Po
我有关于恒星金属丰度的数据,我想将其与学生的 t 分布进行比较。为此,我在 python 上使用 scipy.stats.kstest 运行 Kolmogorov-Smirnov 测试 KSstude
当指定 %pre 脚本时,它是否位于 kickstart 文件中的特定位置?我认为它应该在开头,以便稍后您可以使用 %include 标记来包含脚本文件创建的配置。 或者它可能在其他任何事情之前自动处
我一直在尝试让虚拟机在 centos 上使用 cfg 文件,但不幸的是,我收到 ks.cfg 文件不存在的错误。 下面是我为启用 VM 而运行的命令。 virt-install --name Fedo
我需要你的帮助来确定为什么第 5 步(接近尾声)是必要的。 我有一组有效的步骤来创建一个 keystore.ks,其中包含一个包含本地证书颁发机构证书的链。主tomcat(客户端)和从tomcat(服
我是一名优秀的程序员,十分优秀!