- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在执行以下代码:
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
df.show()
"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
| 1| 2| 0.5| 1.5|
| 2| 3| 2.5| 3.5|
+-----+-----+---------+---------+
"""
group_cols = ["col_1", "col_2"]
measure_cols = ["measure_1", "measure_2"]
for col in measure_cols:
stats = df.groupBy(group_cols).agg(
F.max(col).alias("max_" + col),
F.avg(col).alias("avg_" + col),
)
df = df.join(stats, group_cols)
df.show()
"""
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
|col_1|col_2|measure_1|measure_2|max_measure_1|avg_measure_1|max_measure_2|avg_measure_2|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
"""
如果我的初始 df
不是那么简单,而是实际上是一系列连接或其他操作,那么问题就出现了。当我查看我的工作时,我注意到 df 似乎在我的 groupBy
操作执行时派生了几次。这里的简单查询计划是:
df.explain()
"""
>>> df.explain()
== Physical Plan ==
*(11) Project [col_1#26, col_2#27, measure_1#28, measure_2#29, max_measure_1#56, avg_measure_1#58, max_measure_2#80, avg_measure_2#82]
+- *(11) SortMergeJoin [col_1#26, col_2#27], [col_1#87, col_2#88], Inner
:- *(5) Project [col_1#26, col_2#27, measure_1#28, measure_2#29, max_measure_1#56, avg_measure_1#58]
: +- *(5) SortMergeJoin [col_1#26, col_2#27], [col_1#63, col_2#64], Inner
: :- *(2) Sort [col_1#26 ASC NULLS FIRST, col_2#27 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#26, col_2#27, 200), ENSURE_REQUIREMENTS, [id=#276]
: : +- *(1) Scan ExistingRDD[col_1#26,col_2#27,measure_1#28,measure_2#29]
: +- *(4) Sort [col_1#63 ASC NULLS FIRST, col_2#64 ASC NULLS FIRST], false, 0
: +- *(4) HashAggregate(keys=[col_1#63, col_2#64], functions=[max(measure_1#65), avg(cast(measure_1#65 as double))])
: +- Exchange hashpartitioning(col_1#63, col_2#64, 200), ENSURE_REQUIREMENTS, [id=#282]
: +- *(3) HashAggregate(keys=[col_1#63, col_2#64], functions=[partial_max(measure_1#65), partial_avg(cast(measure_1#65 as double))])
: +- *(3) Project [col_1#63, col_2#64, measure_1#65]
: +- *(3) Scan ExistingRDD[col_1#63,col_2#64,measure_1#65,measure_2#66]
+- *(10) Sort [col_1#87 ASC NULLS FIRST, col_2#88 ASC NULLS FIRST], false, 0
+- *(10) HashAggregate(keys=[col_1#87, col_2#88], functions=[max(measure_2#90), avg(cast(measure_2#90 as double))])
+- *(10) HashAggregate(keys=[col_1#87, col_2#88], functions=[partial_max(measure_2#90), partial_avg(cast(measure_2#90 as double))])
+- *(10) Project [col_1#87, col_2#88, measure_2#90]
+- *(10) SortMergeJoin [col_1#87, col_2#88], [col_1#63, col_2#64], Inner
:- *(7) Sort [col_1#87 ASC NULLS FIRST, col_2#88 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#87, col_2#88, 200), ENSURE_REQUIREMENTS, [id=#293]
: +- *(6) Project [col_1#87, col_2#88, measure_2#90]
: +- *(6) Scan ExistingRDD[col_1#87,col_2#88,measure_1#89,measure_2#90]
+- *(9) Sort [col_1#63 ASC NULLS FIRST, col_2#64 ASC NULLS FIRST], false, 0
+- *(9) HashAggregate(keys=[col_1#63, col_2#64], functions=[])
+- Exchange hashpartitioning(col_1#63, col_2#64, 200), ENSURE_REQUIREMENTS, [id=#299]
+- *(8) HashAggregate(keys=[col_1#63, col_2#64], functions=[])
+- *(8) Project [col_1#63, col_2#64]
+- *(8) Scan ExistingRDD[col_1#63,col_2#64,measure_1#65,measure_2#66]
"""
但是,例如,如果我更改上面的代码,使初始 df
成为连接和联合的结果:
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
{"col_1": 1},
{"col_1": 1},
{"col_1": 2},
{"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)
df = df.unionByName(df)
df = df.join(right_df, on="col_1")
df.show()
"""
+-----+-----+---------+---------+
|col_1|col_2|measure_1|measure_2|
+-----+-----+---------+---------+
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 1| 2| 0.5| 1.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
| 2| 3| 2.5| 3.5|
+-----+-----+---------+---------+
"""
df.explain()
"""
== Physical Plan ==
*(7) Project [col_1#299, col_2#300, measure_1#301, measure_2#302, col_2#354, measure_1#355, measure_2#356]
+- *(7) SortMergeJoin [col_1#299], [col_1#353], Inner
:- *(3) Sort [col_1#299 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#299, 200), ENSURE_REQUIREMENTS, [id=#595]
: +- Union
: :- *(1) Scan ExistingRDD[col_1#299,col_2#300,measure_1#301,measure_2#302]
: +- *(2) Scan ExistingRDD[col_1#299,col_2#300,measure_1#301,measure_2#302]
+- *(6) Sort [col_1#353 ASC NULLS FIRST], false, 0
+- ReusedExchange [col_1#353, col_2#354, measure_1#355, measure_2#356], Exchange hashpartitioning(col_1#299, 200), ENSURE_REQUIREMENTS, [id=#595]
"""
group_cols = ["col_1", "col_2"]
measure_cols = ["measure_1", "measure_2"]
for col in measure_cols:
stats = df.groupBy(group_cols).agg(
F.max(col).alias("max_" + col),
F.avg(col).alias("avg_" + col),
)
df = df.join(stats, group_cols)
df.show()
"""
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
|col_1|col_2|measure_1|measure_2|max_measure_1|avg_measure_1|max_measure_2|avg_measure_2|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
"""
df.explain()
"""
== Physical Plan ==
*(31) Project [col_1#404, col_2#405, measure_1#406, measure_2#407, max_measure_1#465, avg_measure_1#467, max_measure_2#489, avg_measure_2#491]
+- *(31) SortMergeJoin [col_1#404, col_2#405], [col_1#496, col_2#497], Inner
:- *(15) Project [col_1#404, col_2#405, measure_1#406, measure_2#407, max_measure_1#465, avg_measure_1#467]
: +- *(15) SortMergeJoin [col_1#404, col_2#405], [col_1#472, col_2#473], Inner
: :- *(7) Sort [col_1#404 ASC NULLS FIRST, col_2#405 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#404, col_2#405, 200), ENSURE_REQUIREMENTS, [id=#1508]
: : +- *(6) Project [col_1#404, col_2#405, measure_1#406, measure_2#407]
: : +- *(6) SortMergeJoin [col_1#404], [col_1#412], Inner
: : :- *(3) Sort [col_1#404 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(col_1#404, 200), ENSURE_REQUIREMENTS, [id=#1494]
: : : +- Union
: : : :- *(1) Scan ExistingRDD[col_1#404,col_2#405,measure_1#406,measure_2#407]
: : : +- *(2) Scan ExistingRDD[col_1#404,col_2#405,measure_1#406,measure_2#407]
: : +- *(5) Sort [col_1#412 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#412, 200), ENSURE_REQUIREMENTS, [id=#1500]
: : +- *(4) Scan ExistingRDD[col_1#412]
: +- *(14) Sort [col_1#472 ASC NULLS FIRST, col_2#473 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#472, col_2#473, 200), ENSURE_REQUIREMENTS, [id=#1639]
: +- *(13) HashAggregate(keys=[col_1#472, col_2#473], functions=[max(measure_1#474), avg(cast(measure_1#474 as double))])
: +- *(13) HashAggregate(keys=[col_1#472, col_2#473], functions=[partial_max(measure_1#474), partial_avg(cast(measure_1#474 as double))])
: +- *(13) Project [col_1#472, col_2#473, measure_1#474]
: +- *(13) SortMergeJoin [col_1#472], [col_1#412], Inner
: :- *(10) Sort [col_1#472 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#472, 200), ENSURE_REQUIREMENTS, [id=#1516]
: : +- Union
: : :- *(8) Project [col_1#472, col_2#473, measure_1#474]
: : : +- *(8) Scan ExistingRDD[col_1#472,col_2#473,measure_1#474,measure_2#475]
: : +- *(9) Project [col_1#472, col_2#473, measure_1#474]
: : +- *(9) Scan ExistingRDD[col_1#472,col_2#473,measure_1#474,measure_2#475]
: +- *(12) Sort [col_1#412 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#412], Exchange hashpartitioning(col_1#412, 200), ENSURE_REQUIREMENTS, [id=#1500]
+- *(30) Sort [col_1#496 ASC NULLS FIRST, col_2#497 ASC NULLS FIRST], false, 0
+- *(30) HashAggregate(keys=[col_1#496, col_2#497], functions=[max(measure_2#499), avg(cast(measure_2#499 as double))])
+- *(30) HashAggregate(keys=[col_1#496, col_2#497], functions=[partial_max(measure_2#499), partial_avg(cast(measure_2#499 as double))])
+- *(30) Project [col_1#496, col_2#497, measure_2#499]
+- *(30) SortMergeJoin [col_1#496, col_2#497], [col_1#472, col_2#473], Inner
:- *(22) Sort [col_1#496 ASC NULLS FIRST, col_2#497 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#496, col_2#497, 200), ENSURE_REQUIREMENTS, [id=#1660]
: +- *(21) Project [col_1#496, col_2#497, measure_2#499]
: +- *(21) SortMergeJoin [col_1#496], [col_1#412], Inner
: :- *(18) Sort [col_1#496 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#496, 200), ENSURE_REQUIREMENTS, [id=#1544]
: : +- Union
: : :- *(16) Project [col_1#496, col_2#497, measure_2#499]
: : : +- *(16) Scan ExistingRDD[col_1#496,col_2#497,measure_1#498,measure_2#499]
: : +- *(17) Project [col_1#496, col_2#497, measure_2#499]
: : +- *(17) Scan ExistingRDD[col_1#496,col_2#497,measure_1#498,measure_2#499]
: +- *(20) Sort [col_1#412 ASC NULLS FIRST], false, 0
: +- ReusedExchange [col_1#412], Exchange hashpartitioning(col_1#412, 200), ENSURE_REQUIREMENTS, [id=#1500]
+- *(29) Sort [col_1#472 ASC NULLS FIRST, col_2#473 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col_1#472, col_2#473, 200), ENSURE_REQUIREMENTS, [id=#1707]
+- *(28) HashAggregate(keys=[col_1#472, col_2#473], functions=[])
+- *(28) HashAggregate(keys=[col_1#472, col_2#473], functions=[])
+- *(28) Project [col_1#472, col_2#473]
+- *(28) SortMergeJoin [col_1#472], [col_1#412], Inner
:- *(25) Sort [col_1#472 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#472, 200), ENSURE_REQUIREMENTS, [id=#1566]
: +- Union
: :- *(23) Project [col_1#472, col_2#473]
: : +- *(23) Scan ExistingRDD[col_1#472,col_2#473,measure_1#474,measure_2#475]
: +- *(24) Project [col_1#472, col_2#473]
: +- *(24) Scan ExistingRDD[col_1#472,col_2#473,measure_1#474,measure_2#475]
+- *(27) Sort [col_1#412 ASC NULLS FIRST], false, 0
+- ReusedExchange [col_1#412], Exchange hashpartitioning(col_1#412, 200), ENSURE_REQUIREMENTS, [id=#1500]
"""
您可以在查询计划中看到 join + union 被多次派生,这反射(reflect)在我的作业执行报告中,我看到具有相同数量任务的阶段一次又一次地运行。
我怎样才能阻止这种重新推导的发生?
最佳答案
在转换的内部循环中,您根据基础 DataFrame 多次加入 + 派生列,这将受益于 PySpark 的 .cache()
函数。这明确指示 Spark 保留派生的 DataFrame 而不是重新计算它。这意味着您将一次性计算初始 union + join,然后在后续转换中重新使用 DataFrame。
这是一个单行添加,将大大有利于您的执行。
from pyspark.sql import types as T, functions as F, SparkSession
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("measure_1", T.FloatType(), False),
T.StructField("measure_2", T.FloatType(), False),
])
data = [
{"col_1": 1, "col_2": 2, "measure_1": 0.5, "measure_2": 1.5},
{"col_1": 2, "col_2": 3, "measure_1": 2.5, "measure_2": 3.5}
]
df = spark.createDataFrame(data, schema)
right_schema = T.StructType([
T.StructField("col_1", T.IntegerType(), False)
])
right_data = [
{"col_1": 1},
{"col_1": 1},
{"col_1": 2},
{"col_1": 2}
]
right_df = spark.createDataFrame(right_data, right_schema)
df = df.unionByName(df)
df = df.join(right_df, on="col_1")
# ========= Added this line BEFORE the loop
df = df.cache()
# =========
group_cols = ["col_1", "col_2"]
measure_cols = ["measure_1", "measure_2"]
for col in measure_cols:
stats = df.groupBy(group_cols).agg(
F.max(col).alias("max_" + col),
F.avg(col).alias("avg_" + col),
)
df = df.join(stats, group_cols)
df.show()
"""
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
|col_1|col_2|measure_1|measure_2|max_measure_1|avg_measure_1|max_measure_2|avg_measure_2|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 2| 3| 2.5| 3.5| 2.5| 2.5| 3.5| 3.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
| 1| 2| 0.5| 1.5| 0.5| 0.5| 1.5| 1.5|
+-----+-----+---------+---------+-------------+-------------+-------------+-------------+
"""
df.explain()
"""
>>> df.explain()
== Physical Plan ==
*(4) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268, max_measure_1#1312, avg_measure_1#1314, max_measure_2#1336, avg_measure_2#1338]
+- *(4) BroadcastHashJoin [col_1#1265, col_2#1266], [col_1#1343, col_2#1344], Inner, BuildRight, false
:- *(4) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268, max_measure_1#1312, avg_measure_1#1314]
: +- *(4) BroadcastHashJoin [col_1#1265, col_2#1266], [col_1#1319, col_2#1320], Inner, BuildLeft, false
: :- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))),false), [id=#2439]
: : +- *(1) ColumnarToRow
: : +- InMemoryTableScan [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268]
: : +- InMemoryRelation [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268], StorageLevel(disk, memory, deserialized, 1 replicas)
: : +- *(6) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268]
: : +- *(6) SortMergeJoin [col_1#1265], [col_1#1273], Inner
: : :- *(3) Sort [col_1#1265 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(col_1#1265, 200), ENSURE_REQUIREMENTS, [id=#2169]
: : : +- Union
: : : :- *(1) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: : : +- *(2) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: : +- *(5) Sort [col_1#1273 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1273, 200), ENSURE_REQUIREMENTS, [id=#2175]
: : +- *(4) Scan ExistingRDD[col_1#1273]
: +- *(4) HashAggregate(keys=[col_1#1319, col_2#1320], functions=[max(measure_1#1321), avg(cast(measure_1#1321 as double))])
: +- *(4) HashAggregate(keys=[col_1#1319, col_2#1320], functions=[partial_max(measure_1#1321), partial_avg(cast(measure_1#1321 as double))])
: +- *(4) ColumnarToRow
: +- InMemoryTableScan [col_1#1319, col_2#1320, measure_1#1321]
: +- InMemoryRelation [col_1#1319, col_2#1320, measure_1#1321, measure_2#1322], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(6) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268]
: +- *(6) SortMergeJoin [col_1#1265], [col_1#1273], Inner
: :- *(3) Sort [col_1#1265 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1265, 200), ENSURE_REQUIREMENTS, [id=#2169]
: : +- Union
: : :- *(1) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: : +- *(2) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: +- *(5) Sort [col_1#1273 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1273, 200), ENSURE_REQUIREMENTS, [id=#2175]
: +- *(4) Scan ExistingRDD[col_1#1273]
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))),false), [id=#2461]
+- *(3) HashAggregate(keys=[col_1#1343, col_2#1344], functions=[max(measure_2#1346), avg(cast(measure_2#1346 as double))])
+- *(3) HashAggregate(keys=[col_1#1343, col_2#1344], functions=[partial_max(measure_2#1346), partial_avg(cast(measure_2#1346 as double))])
+- *(3) Project [col_1#1343, col_2#1344, measure_2#1346]
+- *(3) BroadcastHashJoin [col_1#1343, col_2#1344], [col_1#1319, col_2#1320], Inner, BuildRight, false
:- *(3) ColumnarToRow
: +- InMemoryTableScan [col_1#1343, col_2#1344, measure_2#1346]
: +- InMemoryRelation [col_1#1343, col_2#1344, measure_1#1345, measure_2#1346], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(6) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268]
: +- *(6) SortMergeJoin [col_1#1265], [col_1#1273], Inner
: :- *(3) Sort [col_1#1265 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(col_1#1265, 200), ENSURE_REQUIREMENTS, [id=#2169]
: : +- Union
: : :- *(1) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: : +- *(2) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: +- *(5) Sort [col_1#1273 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1273, 200), ENSURE_REQUIREMENTS, [id=#2175]
: +- *(4) Scan ExistingRDD[col_1#1273]
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))),false), [id=#2454]
+- *(2) HashAggregate(keys=[col_1#1319, col_2#1320], functions=[])
+- *(2) HashAggregate(keys=[col_1#1319, col_2#1320], functions=[])
+- *(2) ColumnarToRow
+- InMemoryTableScan [col_1#1319, col_2#1320]
+- InMemoryRelation [col_1#1319, col_2#1320, measure_1#1321, measure_2#1322], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(6) Project [col_1#1265, col_2#1266, measure_1#1267, measure_2#1268]
+- *(6) SortMergeJoin [col_1#1265], [col_1#1273], Inner
:- *(3) Sort [col_1#1265 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col_1#1265, 200), ENSURE_REQUIREMENTS, [id=#2169]
: +- Union
: :- *(1) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
: +- *(2) Scan ExistingRDD[col_1#1265,col_2#1266,measure_1#1267,measure_2#1268]
+- *(5) Sort [col_1#1273 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col_1#1273, 200), ENSURE_REQUIREMENTS, [id=#2175]
+- *(4) Scan ExistingRDD[col_1#1273]
"""
您现在可以在查询计划中看到使用 InMemoryTableRelation 代替多次重复洗牌,您的作业执行将反射(reflect)同样多。
注意:.cache()
不会改变您的查询计划,也不会截断它,它只是改变您的数据创建和重新使用的方式.
关于pyspark - 为什么我在构建中看到 DataFrame 的重复实现?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68474926/
我在使用 gradle 构建一个特定应用程序时遇到问题。该应用程序可以用 eclipse 编译和构建,它在平板电脑上运行良好。当我尝试使用 Gradle 构建它时,“compileDebugJava”
我有一个 C 程序,是一位离开的开发人员留给我的。我试图弄清楚他到底在做什么,并将软件重新安排成更合乎逻辑的东西,这样我就可以更轻松地构建它。我正在使用 CMake 构建,而他使用的是 Make。 有
我刚开始阅读“Pro Spring MVC with web flow”,它附带了一个我想遵循的代码示例。 我要什么 - 我想像书中那样构建应用程序,使用 Gradle 有什么问题 - 我没用过 Gr
我希望有人已经这样做了。我正在尝试为我的一个 angular 2 项目在 teamcity 中建立一个连续的构建。在做了一些研究之后,我按照以下步骤操作: 构建步骤 1:为 teamcity 安装 j
我有一个旧的 ASP.Net 网站解决方案,看起来像: 当我在 Visual Studio 中构建解决方案时,我得到以下输出: ------ Build started: Project: C:\..
我使用 gulp-usref、gulp-if、gulp-uglify、gulp-csso 和 gulp-file-include 来构建我的应用程序。除了 HTML 保持原样外,构建中的一切都运行良好
我正在使用 ionic2 开发内部移动应用程序。我可以通过以下方式成功构建 ios: ionic build ios and ionic build ios --prod 但当我这样做时,它一直失败
我是一位经验丰富的 .NET/C# 开发人员,但对这里的几乎所有技术/库(包括 SQL/DB 工作)都是新手。 我正在开发一个具有 Azure/Entity Framework .NET 后端和可移植
我正在使用 VS 2008。我可以使用 IDE 成功编译我的解决方案。但是,当我尝试使用 devenv.com 构建它时,它失败并提示“错误:找不到项目输出组'(无法确定名称)的输出”。该组、其配置或
版本: ember.js 2.7,ember-data 2.7 ember-cli 2.9.1//同样适用于 ember-cli 2.7 node 6.9.1, npm 3.10.9//也适用于 no
我第一次修补 AzureDevops,设置一些 CI 任务。 我有一个公共(public)存储库(开源)和一个包含 3 个 F# 项目的解决方案(.sln)。该解决方案在 Windows/Mac/Li
目前 5.1.5 版本或 STLPort CVS 存储库似乎仍不支持 VS2008。如果有人已经完成了这项工作,那么如果可能的话,分享会很有用:) 同样,了解 VS2005 或 2008 x64 构建
我有一个 Python 2.7 项目,到目前为止一直使用 gfortran 和 MinGW 来构建扩展。我使用 MinGW,因为它似乎支持 Fortran 代码中的写入语句和可分配数组,而 MSVC
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 9年前关闭。 Improve this que
我想知道为什么在 Zimbra Wiki 中只列出了构建过程的特定平台。这意味着不可能在其他 Linux 发行版上构建 Zimbra? Zimbra 社区选择一个特殊的 Linux 发行版来构建 Zi
我将在 Swift 中构建一个 CLI 工具。我用这个命令创建了项目 swift package init --type executable当我构建我的项目并解析 时读取别名 Xcode 中的参数并
我想为添加到 docker 镜像的文件设置文件权限。我有这个简单的 Dockerfile: FROM ubuntu:utopic WORKDIR /app RUN groupadd -g 1000 b
当我使用 clBuildProgram在我的 OpenCl 代码中,它失败并显示错误代码 -11,没有任何日志信息。 这是我的代码的样子: ret = clBuildProgram(program
我有一个底部导航栏,它有一个列表页面,该页面使用状态块。 class _MainPageState extends State { int _index = 0; @override Wi
我在本地计算机上使用Jenkins(Jenkins URL未通过Internet公开,但该计算机上已启用Internet。) 我进行了以下配置更改: 在Jenkins工具上安装了Git和Github插
我是一名优秀的程序员,十分优秀!