- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想计算大型数据集(100 万行)的相关矩阵。这个想法是计算产品销售的相关性。如果两种产品的销售额同比增减相似,则可能存在相关性。
我已经试过这里的帖子了:
它们或多或少都做同样的事情,但它们会在驱动程序处收集相关矩阵。这是一个问题,因为大型数据集使该集合RAM 密集。我正在寻找一种方法将这个问题分解成多个部分并利用 Spark 的分布式计算。有 170,000 种不同的产品,因此作业运行了 170,000 次,并且有 29B 种组合。
我的想法是逐列计算相关性(交叉应用),然后将其收集到数据框(或 RDD)中以对其运行过滤器(仅相关性 > 0.8)。但我没有好的想法开始。
数据集基本上是这样的。
d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)
我将数据转置为列中的年份。
df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)
我计算出 pct_change 具有逐年的相对变化。
df_diff = df.pct_change(axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)
Year 2010 2011 2012
Product
A 0.0 0.100000 0.090909
B 0.0 -0.050000 0.157895
C 0.0 0.066667 0.093750
我需要相关性...有了 Pandas 很容易
# change structure
df_diff = df_diff.stack().unstack(level=0)
# get correlation
df_diff = df_diff.corr().abs()
# change structure back
df_diff = df_diff.unstack().to_frame(name='value')
df_diff.index = df_diff.index.set_names(['Product_1', 'Product_2'])
df_diff.reset_index(inplace=True)
Product_1 Product_2 value
0 A A 1.000000
1 A B 0.207317
2 A C 0.933485
3 B A 0.207317
4 B B 1.000000
5 B C 0.544352
6 C A 0.933485
7 C B 0.544352
8 C C 1.000000
最佳答案
我使用了一个 udf 并将其映射到 spark df。使用 numOfPartitions
,您可以控制生成并分发到工作节点的任务数。在我的示例中,我使用了 16 个节点,每个节点有 8 个 cpu,并将 df 分成 10000 个分区。
import pandas as pd
import numpy as np
d = {'Product': ['A', 'B', 'C','A', 'B', 'C','A', 'B', 'C'],\
'Year': [2010, 2010, 2010, 2011, 2011, 2011, 2012, 2012, 2012],\
'Revenue': [100, 200, 300, 110, 190, 320, 120, 220, 350]}
df = pd.DataFrame(data=d)
df = df.pivot(index='Product', columns='Year', values='Revenue').fillna(0)
df_diff = df.pct_change(axis=1, limit=1).replace([np.inf, -np.inf], np.nan).fillna(0)
df_diff = df_diff.dropna(how='all')
# pivot columns and rows to have year on rows and product on columns
df_diff_piv = df_diff.stack().unstack(level=0).sort_index()
# bring to spark df
df_diff_spark = spark.createDataFrame(df_diff.reset_index())
# correlate on at least x periods
correlation_min_periods = 1 # I used 10 for a 20 periods dataset
# set num of partitions to parallelize on tasks
numOfPartitions = 200 #200 is default
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, FloatType
schema = StructType(
[
StructField("Product_1", StringType()),
StructField("Product_2", StringType()),
StructField("corr", StringType()) #cant get it to work on FloatType()
]
)
def calculate_correlation(product):
data = df_diff_piv
arr = []
for col in data.columns:
m1 = product
m2 = data[col].name
c = np.absolute(data[product].corr(data[col])) #, min_periods=correlation_min_periods
arr.append([m1, m2, str(c)]) #cant get it to work on FloatType()
return arr
#register udf
spark.udf.register("calculate_correlation_udf", calculate_correlation)
calculate_correlation_udf = udf(calculate_correlation, ArrayType(schema))
#apply udf to distinct product
distinct_product = df_diff_spark.select("Product").distinct().repartition(numOfPartitions)
res = distinct_product.select("Product", calculate_correlation_udf("Product").alias("corr_matrix"))
from pyspark.sql.functions import explode
# explode (flatten) array and struct back to dataframe
expl = res.select(explode("corr_matrix").alias("corr_row"))
rowlevel = expl.select("corr_row.Product_1","corr_row.Product_2","corr_row.corr")
# convert string to float
rowlevel = rowlevel.withColumn("corr", rowlevel["corr"].cast(FloatType()))
rowlevel.show()
关于pandas - 使用大型数据集在 pyspark 中获取相关矩阵,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60430645/
我在数据框中有一列月份数字,想将其更改为月份名称,所以我使用了这个: df['monthName'] = df['monthNumber'].apply(lambda x: calendar.mont
Pyspark 中是否有一个 input() 函数,我可以通过它获取控制台输入。如果是,请详细说明一下。 如何在 PySpark 中编写以下代码: directory_change = input("
我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有未与架构确认的数据。 stringDf = sparkSession.cr
我正在开发基于一组 ORC 文件的 spark 数据框的 sql 查询。程序是这样的: from pyspark.sql import SparkSession spark_session = Spa
我有一个 Pyspark 数据框( 原始数据框 )具有以下数据(所有列都有 字符串 数据类型): id Value 1 103 2
我有一台配置了Redis和Maven的服务器 然后我执行以下sparkSession spark = pyspark .sql .SparkSession .builder .master('loca
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有: +---+-------+-------+ | id| var1| var2| +---+-------+-------+ | a|[1,2,3]|[1,2,3]| | b|[2,
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有一个带有多个数字列的 pyspark DF,我想为每一列根据每个变量计算该行的十分位数或其他分位数等级。 这对 Pandas 来说很简单,因为我们可以使用 qcut 函数为每个变量创建一个新列,如
我有以下使用 pyspark.ml 包进行线性回归的代码。但是,当模型适合时,我在最后一行收到此错误消息: IllegalArgumentException: u'requirement failed
我有一个由 | 分隔的平面文件(管道),没有引号字符。示例数据如下所示: SOME_NUMBER|SOME_MULTILINE_STRING|SOME_STRING 23|multiline text
给定如下模式: root |-- first_name: string |-- last_name: string |-- degrees: array | |-- element: struc
我有一个 pyspark 数据框如下(这只是一个简化的例子,我的实际数据框有数百列): col1,col2,......,col_with_fix_header 1,2,.......,3 4,5,.
我有一个数据框 +------+--------------------+-----------------+---- | id| titulo |tipo | formac
我从 Spark 数组“df_spark”开始: from pyspark.sql import SparkSession import pandas as pd import numpy as np
如何根据行号/行索引值删除 Pyspark 中的行值? 我是 Pyspark(和编码)的新手——我尝试编码一些东西,但它不起作用。 最佳答案 您不能删除特定的列,但您可以使用 filter 或其别名
我有一个循环生成多个因子表的输出并将列名存储在列表中: | id | f_1a | f_2a | |:---|:----:|:-----| |1 |1.2 |0.95 | |2 |0.7
我正在尝试将 hql 脚本转换为 pyspark。我正在努力如何在 groupby 子句之后的聚合中实现 case when 语句的总和。例如。 dataframe1 = dataframe0.gro
我想添加新的 2 列值服务 arr 第一个和第二个值 但我收到错误: Field name should be String Literal, but it's 0; production_targe
我是一名优秀的程序员,十分优秀!