gpt4 book ai didi

Pyspark calculate new rows based on previous rows from current and other multiple columns(Pyspark根据当前列和其他多列中的先前行计算新行)

转载 作者:bug小助手 更新时间:2023-10-25 17:26:57 24 4
gpt4 key购买 nike



I have an Excel sheet of the formula I need to convert into Pyspark code

我有一个Excel表格的公式,我需要转换成火花源代码


considering columns A, B, C, D, E, F, G, H and I where columns F, G, H and I have fixed random numeric values.

考虑列A、B、C、D、E、F、G、H和I,其中列F、G、H和I具有固定的随机数值。


Column A has the first row as NULL and subsequent rows follow the formula as "=F3+G3+((C2+D2+E2)/2)".

列A的第一行为空,后续行遵循公式“=F3+G3+((C2+D2+E2)/2)”。


Column B has the first row as 1000 and subsequent rows follow the formula as "=A3+(B2/2)".

列B的第一行为1000,后续行遵循公式“=A3+(B2/2)”。


column C follows the formula as "=$B2*5+(100/2)".

C栏的公式如下:“=$B2*5+(100/2)”。


column D follows the formula as "=$B2*5+(10/2)".

D栏的公式为“=$B2*5+(10/2)”。


Column E follows the formula as "=$B2*5+(1/2)".

E栏的公式如下:“=$B2*5+(1/2)”。


Screenshot of the Excel data

Excel数据的屏幕截图


could you write me a Pyspark code for the same?

你能为我写一个同样的火花源代码吗?


更多回答
优秀答案推荐

You will have to use Pandas API on Spark to achieve this. But the calculations are dependent in complex ways for previously computed values. So it might be slow. Here's an example solution.

您必须在Spark上使用Pandas API才能实现这一点。但对于之前计算的值,计算以复杂的方式进行。因此,它可能会很慢。以下是一个示例解决方案。


import sys
from pyspark import SparkContext, SQLContext
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from collections.abc import Iterable



sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

[0.0, 1000, 0.0, 0.0, 0.0, 15, 62, "ITEM147", "1"], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", "2"], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", "3"],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", "4"],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", "5"],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", "6"],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", "7"],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", "8"],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", "9"],

]

df1Columns = ["A","B","C","D","E","F","G","Product","Steps"]

pyspark_pandas_dataframe = ps.DataFrame(data=data1, columns=df1Columns)
print("Given initial pyspark pandas dataframe")
print(pyspark_pandas_dataframe)

print("Datatypes of the columns")
print(pyspark_pandas_dataframe.dtypes)


def set_next_C(pdf, kk):
pdf.loc[kk, "C"] = pdf.loc[kk, "B"] * 5 + (100.0/2)

def set_next_D(pdf, kk):
pdf.loc[kk, "D"] = pdf.loc[kk, "B"] * 5 + (10.0/2)

def set_next_E(pdf, kk):
pdf.loc[kk, "E"] = pdf.loc[kk, "B"] * 5 + (1.0/2)

def set_next_A(pdf, kk):
pdf.loc[kk, "A"] = pdf.loc[kk, "F"] + pdf.loc[kk, "G"] + ((pdf.loc[kk-1, "C"] + pdf.loc[kk-1, "D"] + pdf.loc[kk-1, "E"]) / 2.0)

def set_next_B(pdf, kk):
pdf.loc[kk, "B"] = pdf.loc[kk, "A"] + (pdf.loc[kk - 1, "B"] / 2.0)


print("shape / dimensions of dataframe", pyspark_pandas_dataframe.shape)

row_num, col_num = pyspark_pandas_dataframe.shape

print(f"row_num : {row_num}, col_num : {col_num}")

for cc in range(col_num):

if cc == 0 :
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)
continue

set_next_A(pyspark_pandas_dataframe, cc)
set_next_B(pyspark_pandas_dataframe, cc)
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)


print("FINAL Result Dataframe")
print(pyspark_pandas_dataframe)

Output :

输出:


Given initial pyspark pandas dataframe
A B C D E F G Product Steps
0 0.0 1000.0 0.0 0.0 0.0 15 62 ITEM147 1
1 0.0 0.0 0.0 0.0 0.0 75 77 ITEM147 2
2 0.0 0.0 0.0 0.0 0.0 55 20 ITEM147 3
3 0.0 0.0 0.0 0.0 0.0 14 81 ITEM147 4
4 0.0 0.0 0.0 0.0 0.0 91 44 ITEM147 5
5 0.0 0.0 0.0 0.0 0.0 19 86 ITEM147 6
6 0.0 0.0 0.0 0.0 0.0 63 94 ITEM147 7
7 0.0 0.0 0.0 0.0 0.0 15 55 ITEM147 8
8 0.0 0.0 0.0 0.0 0.0 33 48 ITEM147 9
Datatypes of the columns
A float64
B float64
C float64
D float64
E float64
F int64
G int64
Product object
Steps object
dtype: object
shape / dimensions of dataframe (9, 9)
row_num : 9, col_num : 9
FINAL Result Dataframe
A B C D E F G Product Steps
0 0.000000e+00 1.000000e+03 5.050000e+03 5.005000e+03 5.000500e+03 15 62 ITEM147 1
1 7.679750e+03 8.179750e+03 4.094875e+04 4.090375e+04 4.089925e+04 75 77 ITEM147 2
2 6.145088e+04 6.554075e+04 3.277538e+05 3.277088e+05 3.277042e+05 55 20 ITEM147 3
3 4.916784e+05 5.244488e+05 2.622294e+06 2.622249e+06 2.622244e+06 14 81 ITEM147 4
4 3.933528e+06 4.195753e+06 2.097881e+07 2.097877e+07 2.097876e+07 91 44 ITEM147 5
5 3.146828e+07 3.356615e+07 1.678308e+08 1.678308e+08 1.678308e+08 19 86 ITEM147 6
6 2.517463e+08 2.685294e+08 1.342647e+09 1.342647e+09 1.342647e+09 63 94 ITEM147 7
7 2.013971e+09 2.148235e+09 1.074118e+10 1.074118e+10 1.074118e+10 15 55 ITEM147 8
8 1.611177e+10 1.718588e+10 8.592942e+10 8.592942e+10 8.592942e+10 33 48 ITEM147 9


This is pyspark solution using pandas_udf. Not sure about the performance.

这是使用PANAAS_UDF的火花源解决方案。我对表演不太确定。


import sys
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.types import *


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

[0.0, 1000.0, 0.0, 0.0, 0.0, 15, 62, "ITEM147", 1], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", 2], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", 3],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", 4],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", 5],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", 6],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", 7],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", 8],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", 9],

]

df1Columns = ["A", "B", "C", "D", "E", "F", "G", "Product", "Steps"]

dataframe = sqlContext.createDataFrame(data=data1, schema=df1Columns)
print("Given initial pyspark pandas dataframe")
print(dataframe)

dataframe.printSchema()



def set_next_C(curr_b):
return curr_b * 5 + (100.0/2)

def set_next_D(curr_b):
return curr_b * 5 + (10.0/2)

def set_next_E(curr_b):
return curr_b * 5 + (1.0/2)

def set_next_A(curr_f, curr_g, old_c, old_d, old_e):
return curr_f + curr_g + ((old_c + old_d + old_e) / 2.0)

def set_next_B(curr_a, old_b):
return curr_a + (old_b / 2.0)


@F.pandas_udf("string")
def multiply_aggregation( all_cols_series: pd.Series ) -> str :

### Initialization
a_old = all_cols_series[0][0]
b_old = all_cols_series[0][1]

c_old = None
d_old = None
e_old = None

f_old = all_cols_series[0][5]
g_old = all_cols_series[0][6]


## Current will be updated as we go along. At first, they will be initial values
a_current = None
b_current = None
c_current = None
d_current = None
e_current = None

length_series = all_cols_series.size


for cc in range(length_series):

if cc == 0:
## First row is being initialized
c_old = set_next_C(b_old)
d_old = set_next_D(b_old)
e_old = set_next_E(b_old)
continue


### calculate all current values
f_current = all_cols_series[cc][5]
g_current = all_cols_series[cc][6]
a_current = set_next_A(f_current, g_current, c_old, d_old, e_old)
b_current = set_next_B(a_current, b_old)
c_current = set_next_C(b_current)
d_current = set_next_D(b_current)
e_current = set_next_E(b_current)

## put current values into old so that they will be used in future calculation
a_old = a_current
b_old = b_current
c_old = c_current
d_old = d_current
e_old = e_current


list_cols_final = [str(a_old), str(b_old), str(c_old), str(d_old), str(e_old)]
result_string = "@@".join(list_cols_final)


return result_string




windowSpec = Window.orderBy(F.col("Steps").asc()).rowsBetween(Window.unboundedPreceding, 0)

dataframe = dataframe.withColumn("all_cols_array", F.array(["A", "B", "C", "D", "E", "F", "G" ]))

calculated_columns_df = dataframe.withColumn("all_rows_calculated", multiply_aggregation(F.col("all_cols_array")).over(windowSpec))
print("Calculated Dataframe : Window Aggregation Function Applied")
calculated_columns_df.show(n=100, truncate=False)


split_df = calculated_columns_df.withColumn("splitted_values", F.split("all_rows_calculated", "@@"))

split_df = split_df.withColumn("A", F.col("splitted_values").getItem(0).cast("float"))
split_df = split_df.withColumn("B", F.col("splitted_values").getItem(1).cast("float"))
split_df = split_df.withColumn("C", F.col("splitted_values").getItem(2).cast("float"))
split_df = split_df.withColumn("D", F.col("splitted_values").getItem(3).cast("float"))
split_df = split_df.withColumn("E", F.col("splitted_values").getItem(4).cast("float"))

split_df = split_df.drop(*["all_rows_calculated", "all_cols_array", "splitted_values"])

print("Final Calculated Values")
split_df.show(n=100, truncate=False)

Output :

输出:


Given initial pyspark pandas dataframe
DataFrame[A: double, B: double, C: double, D: double, E: double, F: bigint, G: bigint, Product: string, Steps: bigint]
root
|-- A: double (nullable = true)
|-- B: double (nullable = true)
|-- C: double (nullable = true)
|-- D: double (nullable = true)
|-- E: double (nullable = true)
|-- F: long (nullable = true)
|-- G: long (nullable = true)
|-- Product: string (nullable = true)
|-- Steps: long (nullable = true)

Calculated Dataframe : Window Aggregation Function Applied

+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|A |B |C |D |E |F |G |Product|Steps|all_cols_array |all_rows_calculated |
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|0.0|1000.0|0.0|0.0|0.0|15 |62 |ITEM147|1 |[0.0, 1000.0, 0.0, 0.0, 0.0, 15.0, 62.0]|0.0@@1000.0@@5050.0@@5005.0@@5000.5 |
|0.0|0.0 |0.0|0.0|0.0|75 |77 |ITEM147|2 |[0.0, 0.0, 0.0, 0.0, 0.0, 75.0, 77.0] |7679.75@@8179.75@@40948.75@@40903.75@@40899.25 |
|0.0|0.0 |0.0|0.0|0.0|55 |20 |ITEM147|3 |[0.0, 0.0, 0.0, 0.0, 0.0, 55.0, 20.0] |61450.875@@65540.75@@327753.75@@327708.75@@327704.25 |
|0.0|0.0 |0.0|0.0|0.0|14 |81 |ITEM147|4 |[0.0, 0.0, 0.0, 0.0, 0.0, 14.0, 81.0] |491678.375@@524448.75@@2622293.75@@2622248.75@@2622244.25 |
|0.0|0.0 |0.0|0.0|0.0|91 |44 |ITEM147|5 |[0.0, 0.0, 0.0, 0.0, 0.0, 91.0, 44.0] |3933528.375@@4195752.75@@20978813.75@@20978768.75@@20978764.25 |
|0.0|0.0 |0.0|0.0|0.0|19 |86 |ITEM147|6 |[0.0, 0.0, 0.0, 0.0, 0.0, 19.0, 86.0] |31468278.375@@33566154.75@@167830823.75@@167830778.75@@167830774.25 |
|0.0|0.0 |0.0|0.0|0.0|63 |94 |ITEM147|7 |[0.0, 0.0, 0.0, 0.0, 0.0, 63.0, 94.0] |251746345.375@@268529422.75@@1342647163.75@@1342647118.75@@1342647114.25 |
|0.0|0.0 |0.0|0.0|0.0|15 |55 |ITEM147|8 |[0.0, 0.0, 0.0, 0.0, 0.0, 15.0, 55.0] |2013970768.375@@2148235479.75@@10741177448.75@@10741177403.75@@10741177399.25 |
|0.0|0.0 |0.0|0.0|0.0|33 |48 |ITEM147|9 |[0.0, 0.0, 0.0, 0.0, 0.0, 33.0, 48.0] |16111766206.875@@17185883946.75@@85929419783.75@@85929419738.75@@85929419734.25|
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+


Final Calculated Values

+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|A |B |C |D |E |F |G |Product|Steps|
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|0.0 |1000.0 |5050.0 |5005.0 |5000.5 |15 |62 |ITEM147|1 |
|7679.75 |8179.75 |40948.75 |40903.75 |40899.25 |75 |77 |ITEM147|2 |
|61450.875 |65540.75 |327753.75 |327708.75 |327704.25 |55 |20 |ITEM147|3 |
|491678.38 |524448.75 |2622293.8 |2622248.8 |2622244.2 |14 |81 |ITEM147|4 |
|3933528.5 |4195753.0 |2.0978814E7 |2.0978768E7 |2.0978764E7 |91 |44 |ITEM147|5 |
|3.1468278E7 |3.3566156E7 |1.67830816E8 |1.67830784E8 |1.67830768E8 |19 |86 |ITEM147|6 |
|2.51746352E8 |2.68529408E8 |1.34264717E9 |1.34264717E9 |1.34264717E9 |63 |94 |ITEM147|7 |
|2.01397082E9 |2.14823552E9 |1.07411773E10|1.07411773E10|1.07411773E10|15 |55 |ITEM147|8 |
|1.61117665E10|1.71858842E10|8.5929419E10 |8.5929419E10 |8.5929419E10 |33 |48 |ITEM147|9 |
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+

更多回答

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com