gpt4 book ai didi

python - Spark 中多列的窗口聚合

转载 作者:太空宇宙 更新时间:2023-11-03 19:47:19 26 4
gpt4 key购买 nike

在 Pyspark 中跨多个列进行聚合时遇到问题。有数百个 bool 列显示系统的当前状态,每秒添加一行。目标是转换此数据以显示每 10 秒窗口的状态更改数量。

我计划分两步执行此操作,首先将 bool 值与前一行的值进行异或,然后在 10 秒的窗口中进行第二次求和。这是我想出的粗略代码:

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time

sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)

# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])

df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(m).alias(m) for m in cols]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

通过按分钟分区的 data_window,Spark 生成 52 个阶段,每个阶段都依赖于最后一个阶段。增加 num_of_cols 也会增加阶段数量。在我看来,这应该是一个令人尴尬的可并行化问题。将每一行与最后一行进行比较,然后按 10 秒进行聚合。删除data_windowpartitionBy允许它在单个阶段中运行,但它强制将所有数据放在单个分区上来实现它。

为什么各个阶段相互依赖,是否有更好的方法来编写它以提高并行性?我认为可以同时在同一窗口上进行多个聚合。最终这需要扩展到数百列,此时有什么技巧可以提高性能吗?

最佳答案

根据 Georg 的有用回复,我得出以下结论:

import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time
import pprint


sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)


@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
values = v.values
if len(values) == 1:
return values[0] * False
elif len(values) == 2:
return values[0] ^ values[1]
else:
raise RuntimeError('Too many values given to pandas_xor: {}'.format(values))


# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

df = df.select('Time', F.array(*cols).alias('data'))

# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))

df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

按照以下说明使用 Spark 3.0.0preview2 运行它

  1. 下载 Spark 3.0.0

    mkdir contrib
    wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
    tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
    rm contrib/spark-3.0.0-preview2.tgz
  2. 在第一个 shell 中,配置环境以使用 Pyspark 3.0.0

    export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
    export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
  3. 启动 pyspark 作业

    time python3 so-example.py

    查看本地 Spark 运行的 Web UI:http://localhost:4040

关于python - Spark 中多列的窗口聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60047337/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com