You will have to use Pandas API on Spark to achieve this. But the calculations are dependent in complex ways for previously computed values. So it might be slow. Here's an example solution.
您必须在Spark上使用Pandas API才能实现这一点。但对于之前计算的值,计算以复杂的方式进行。因此,它可能会很慢。以下是一个示例解决方案。
import sys
from pyspark import SparkContext, SQLContext
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from collections.abc import Iterable
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[0.0, 1000, 0.0, 0.0, 0.0, 15, 62, "ITEM147", "1"], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", "2"], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", "3"],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", "4"],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", "5"],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", "6"],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", "7"],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", "8"],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", "9"],
]
df1Columns = ["A","B","C","D","E","F","G","Product","Steps"]
pyspark_pandas_dataframe = ps.DataFrame(data=data1, columns=df1Columns)
print("Given initial pyspark pandas dataframe")
print(pyspark_pandas_dataframe)
print("Datatypes of the columns")
print(pyspark_pandas_dataframe.dtypes)
def set_next_C(pdf, kk):
pdf.loc[kk, "C"] = pdf.loc[kk, "B"] * 5 + (100.0/2)
def set_next_D(pdf, kk):
pdf.loc[kk, "D"] = pdf.loc[kk, "B"] * 5 + (10.0/2)
def set_next_E(pdf, kk):
pdf.loc[kk, "E"] = pdf.loc[kk, "B"] * 5 + (1.0/2)
def set_next_A(pdf, kk):
pdf.loc[kk, "A"] = pdf.loc[kk, "F"] + pdf.loc[kk, "G"] + ((pdf.loc[kk-1, "C"] + pdf.loc[kk-1, "D"] + pdf.loc[kk-1, "E"]) / 2.0)
def set_next_B(pdf, kk):
pdf.loc[kk, "B"] = pdf.loc[kk, "A"] + (pdf.loc[kk - 1, "B"] / 2.0)
print("shape / dimensions of dataframe", pyspark_pandas_dataframe.shape)
row_num, col_num = pyspark_pandas_dataframe.shape
print(f"row_num : {row_num}, col_num : {col_num}")
for cc in range(col_num):
if cc == 0 :
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)
continue
set_next_A(pyspark_pandas_dataframe, cc)
set_next_B(pyspark_pandas_dataframe, cc)
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)
print("FINAL Result Dataframe")
print(pyspark_pandas_dataframe)
Output :
输出:
Given initial pyspark pandas dataframe
A B C D E F G Product Steps
0 0.0 1000.0 0.0 0.0 0.0 15 62 ITEM147 1
1 0.0 0.0 0.0 0.0 0.0 75 77 ITEM147 2
2 0.0 0.0 0.0 0.0 0.0 55 20 ITEM147 3
3 0.0 0.0 0.0 0.0 0.0 14 81 ITEM147 4
4 0.0 0.0 0.0 0.0 0.0 91 44 ITEM147 5
5 0.0 0.0 0.0 0.0 0.0 19 86 ITEM147 6
6 0.0 0.0 0.0 0.0 0.0 63 94 ITEM147 7
7 0.0 0.0 0.0 0.0 0.0 15 55 ITEM147 8
8 0.0 0.0 0.0 0.0 0.0 33 48 ITEM147 9
Datatypes of the columns
A float64
B float64
C float64
D float64
E float64
F int64
G int64
Product object
Steps object
dtype: object
shape / dimensions of dataframe (9, 9)
row_num : 9, col_num : 9
FINAL Result Dataframe
A B C D E F G Product Steps
0 0.000000e+00 1.000000e+03 5.050000e+03 5.005000e+03 5.000500e+03 15 62 ITEM147 1
1 7.679750e+03 8.179750e+03 4.094875e+04 4.090375e+04 4.089925e+04 75 77 ITEM147 2
2 6.145088e+04 6.554075e+04 3.277538e+05 3.277088e+05 3.277042e+05 55 20 ITEM147 3
3 4.916784e+05 5.244488e+05 2.622294e+06 2.622249e+06 2.622244e+06 14 81 ITEM147 4
4 3.933528e+06 4.195753e+06 2.097881e+07 2.097877e+07 2.097876e+07 91 44 ITEM147 5
5 3.146828e+07 3.356615e+07 1.678308e+08 1.678308e+08 1.678308e+08 19 86 ITEM147 6
6 2.517463e+08 2.685294e+08 1.342647e+09 1.342647e+09 1.342647e+09 63 94 ITEM147 7
7 2.013971e+09 2.148235e+09 1.074118e+10 1.074118e+10 1.074118e+10 15 55 ITEM147 8
8 1.611177e+10 1.718588e+10 8.592942e+10 8.592942e+10 8.592942e+10 33 48 ITEM147 9
This is pyspark solution using pandas_udf. Not sure about the performance.
这是使用pandas_udf的pyspark解决方案。不确定性能。
import sys
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.types import *
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[0.0, 1000.0, 0.0, 0.0, 0.0, 15, 62, "ITEM147", 1], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", 2], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", 3],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", 4],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", 5],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", 6],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", 7],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", 8],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", 9],
]
df1Columns = ["A", "B", "C", "D", "E", "F", "G", "Product", "Steps"]
dataframe = sqlContext.createDataFrame(data=data1, schema=df1Columns)
print("Given initial pyspark pandas dataframe")
print(dataframe)
dataframe.printSchema()
def set_next_C(curr_b):
return curr_b * 5 + (100.0/2)
def set_next_D(curr_b):
return curr_b * 5 + (10.0/2)
def set_next_E(curr_b):
return curr_b * 5 + (1.0/2)
def set_next_A(curr_f, curr_g, old_c, old_d, old_e):
return curr_f + curr_g + ((old_c + old_d + old_e) / 2.0)
def set_next_B(curr_a, old_b):
return curr_a + (old_b / 2.0)
@F.pandas_udf("string")
def multiply_aggregation( all_cols_series: pd.Series ) -> str :
### Initialization
a_old = all_cols_series[0][0]
b_old = all_cols_series[0][1]
c_old = None
d_old = None
e_old = None
f_old = all_cols_series[0][5]
g_old = all_cols_series[0][6]
## Current will be updated as we go along. At first, they will be initial values
a_current = None
b_current = None
c_current = None
d_current = None
e_current = None
length_series = all_cols_series.size
for cc in range(length_series):
if cc == 0:
## First row is being initialized
c_old = set_next_C(b_old)
d_old = set_next_D(b_old)
e_old = set_next_E(b_old)
continue
### calculate all current values
f_current = all_cols_series[cc][5]
g_current = all_cols_series[cc][6]
a_current = set_next_A(f_current, g_current, c_old, d_old, e_old)
b_current = set_next_B(a_current, b_old)
c_current = set_next_C(b_current)
d_current = set_next_D(b_current)
e_current = set_next_E(b_current)
## put current values into old so that they will be used in future calculation
a_old = a_current
b_old = b_current
c_old = c_current
d_old = d_current
e_old = e_current
list_cols_final = [str(a_old), str(b_old), str(c_old), str(d_old), str(e_old)]
result_string = "@@".join(list_cols_final)
return result_string
windowSpec = Window.orderBy(F.col("Steps").asc()).rowsBetween(Window.unboundedPreceding, 0)
dataframe = dataframe.withColumn("all_cols_array", F.array(["A", "B", "C", "D", "E", "F", "G" ]))
calculated_columns_df = dataframe.withColumn("all_rows_calculated", multiply_aggregation(F.col("all_cols_array")).over(windowSpec))
print("Calculated Dataframe : Window Aggregation Function Applied")
calculated_columns_df.show(n=100, truncate=False)
split_df = calculated_columns_df.withColumn("splitted_values", F.split("all_rows_calculated", "@@"))
split_df = split_df.withColumn("A", F.col("splitted_values").getItem(0).cast("float"))
split_df = split_df.withColumn("B", F.col("splitted_values").getItem(1).cast("float"))
split_df = split_df.withColumn("C", F.col("splitted_values").getItem(2).cast("float"))
split_df = split_df.withColumn("D", F.col("splitted_values").getItem(3).cast("float"))
split_df = split_df.withColumn("E", F.col("splitted_values").getItem(4).cast("float"))
split_df = split_df.drop(*["all_rows_calculated", "all_cols_array", "splitted_values"])
print("Final Calculated Values")
split_df.show(n=100, truncate=False)
Output :
输出:
Given initial pyspark pandas dataframe
DataFrame[A: double, B: double, C: double, D: double, E: double, F: bigint, G: bigint, Product: string, Steps: bigint]
root
|-- A: double (nullable = true)
|-- B: double (nullable = true)
|-- C: double (nullable = true)
|-- D: double (nullable = true)
|-- E: double (nullable = true)
|-- F: long (nullable = true)
|-- G: long (nullable = true)
|-- Product: string (nullable = true)
|-- Steps: long (nullable = true)
Calculated Dataframe : Window Aggregation Function Applied
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|A |B |C |D |E |F |G |Product|Steps|all_cols_array |all_rows_calculated |
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|0.0|1000.0|0.0|0.0|0.0|15 |62 |ITEM147|1 |[0.0, 1000.0, 0.0, 0.0, 0.0, 15.0, 62.0]|0.0@@1000.0@@5050.0@@5005.0@@5000.5 |
|0.0|0.0 |0.0|0.0|0.0|75 |77 |ITEM147|2 |[0.0, 0.0, 0.0, 0.0, 0.0, 75.0, 77.0] |7679.75@@8179.75@@40948.75@@40903.75@@40899.25 |
|0.0|0.0 |0.0|0.0|0.0|55 |20 |ITEM147|3 |[0.0, 0.0, 0.0, 0.0, 0.0, 55.0, 20.0] |61450.875@@65540.75@@327753.75@@327708.75@@327704.25 |
|0.0|0.0 |0.0|0.0|0.0|14 |81 |ITEM147|4 |[0.0, 0.0, 0.0, 0.0, 0.0, 14.0, 81.0] |491678.375@@524448.75@@2622293.75@@2622248.75@@2622244.25 |
|0.0|0.0 |0.0|0.0|0.0|91 |44 |ITEM147|5 |[0.0, 0.0, 0.0, 0.0, 0.0, 91.0, 44.0] |3933528.375@@4195752.75@@20978813.75@@20978768.75@@20978764.25 |
|0.0|0.0 |0.0|0.0|0.0|19 |86 |ITEM147|6 |[0.0, 0.0, 0.0, 0.0, 0.0, 19.0, 86.0] |31468278.375@@33566154.75@@167830823.75@@167830778.75@@167830774.25 |
|0.0|0.0 |0.0|0.0|0.0|63 |94 |ITEM147|7 |[0.0, 0.0, 0.0, 0.0, 0.0, 63.0, 94.0] |251746345.375@@268529422.75@@1342647163.75@@1342647118.75@@1342647114.25 |
|0.0|0.0 |0.0|0.0|0.0|15 |55 |ITEM147|8 |[0.0, 0.0, 0.0, 0.0, 0.0, 15.0, 55.0] |2013970768.375@@2148235479.75@@10741177448.75@@10741177403.75@@10741177399.25 |
|0.0|0.0 |0.0|0.0|0.0|33 |48 |ITEM147|9 |[0.0, 0.0, 0.0, 0.0, 0.0, 33.0, 48.0] |16111766206.875@@17185883946.75@@85929419783.75@@85929419738.75@@85929419734.25|
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
Final Calculated Values
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|A |B |C |D |E |F |G |Product|Steps|
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|0.0 |1000.0 |5050.0 |5005.0 |5000.5 |15 |62 |ITEM147|1 |
|7679.75 |8179.75 |40948.75 |40903.75 |40899.25 |75 |77 |ITEM147|2 |
|61450.875 |65540.75 |327753.75 |327708.75 |327704.25 |55 |20 |ITEM147|3 |
|491678.38 |524448.75 |2622293.8 |2622248.8 |2622244.2 |14 |81 |ITEM147|4 |
|3933528.5 |4195753.0 |2.0978814E7 |2.0978768E7 |2.0978764E7 |91 |44 |ITEM147|5 |
|3.1468278E7 |3.3566156E7 |1.67830816E8 |1.67830784E8 |1.67830768E8 |19 |86 |ITEM147|6 |
|2.51746352E8 |2.68529408E8 |1.34264717E9 |1.34264717E9 |1.34264717E9 |63 |94 |ITEM147|7 |
|2.01397082E9 |2.14823552E9 |1.07411773E10|1.07411773E10|1.07411773E10|15 |55 |ITEM147|8 |
|1.61117665E10|1.71858842E10|8.5929419E10 |8.5929419E10 |8.5929419E10 |33 |48 |ITEM147|9 |
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
更多回答
我是一名优秀的程序员,十分优秀!