gpt4 book ai didi

python - 是否可以在 Spark 中按组缩放数据?

转载 作者:行者123 更新时间:2023-11-28 22:37:37 25 4
gpt4 key购买 nike

我想用 StandardScaler(from pyspark.mllib.feature import StandardScaler)缩放数据,现在我可以通过将 RDD 的值传递给转换函数来实现,但问题是我想保留 key 。无论如何,我是否可以通过保留其 key 来扩展我的数据?

样本数据集

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf.

导入

import sys
import os
from collections import OrderedDict
from numpy import array
from math import sqrt
try:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from pyspark.statcounter import StatCounter

print ("Successfully imported Spark Modules")
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)

部分代码

    sc = SparkContext(conf=conf)   
raw_data = sc.textFile(data_file)
parsed_data = raw_data.map(Parseline)

Parseline 函数:

def Parseline(line):
line_split = line.split(",")
clean_line_split = [line_split[0]]+line_split[4:-1]
return (line_split[-1], array([float(x) for x in clean_line_split]))

最佳答案

这不是一个很好的解决方案,但您可以调整我对 the similar Scala question 的回答.让我们从示例数据开始:

import numpy as np

np.random.seed(323)

keys = ["foo"] * 50 + ["bar"] * 50
values = (
np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) +
np.random.rand(100, 10)
)

rdd = sc.parallelize(zip(keys, values))

不幸的是,MultivariateStatisticalSummary 只是 JVM 模型的包装器,它对 Python 并不友好。幸运的是,有了 NumPy 数组,我们可以使用标准的 StatCounter 来按键计算统计数据:

from pyspark.statcounter import StatCounter

def compute_stats(rdd):
return rdd.aggregateByKey(
StatCounter(), StatCounter.merge, StatCounter.mergeStats
).collectAsMap()

最后我们可以映射来归一化:

def scale(rdd, stats):
def scale_(kv):
k, v = kv
return (v - stats[k].mean()) / stats[k].stdev()
return rdd.map(scale_)

scaled = scale(rdd, compute_stats(rdd))
scaled.first()

## array([ 1.59879188, -1.66816084, 1.38546532, 1.76122047, 1.48132643,
## 0.01512487, 1.49336769, 0.47765982, -1.04271866, 1.55288814])

关于python - 是否可以在 Spark 中按组缩放数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36358233/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com