- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我很难理解为什么我无法运行一个转换,在等待几分钟(有时几个小时)后,返回错误“序列化结果太大”。
在转换中,我有一个日期列表,我在 for 循环中迭代这些日期以在特定时间间隔内进行增量计算。
预期的数据集是迭代数据集的并集,应该包含 450k 行,不会太多,但我有很多计算阶段、任务和尝试!!
配置文件已设置为中配置文件,我无法在其他配置文件上缩放,也无法设置 maxResultSize = 0。
代码示例:
Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []
for date in Date_list:
tmp = df.filter(between [date, date-7days]).withColumn('example', F.lit(date))
........
df2 = df.join(tmp, 'column', 'inner').......
df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))
return df_total
不要注意语法。这只是一个例子,说明循环内部有一系列的操作。 我想要的输出是一个数据框,其中包含每次迭代的数据框!
谢谢!!
最佳答案
您遇到了 Spark 的已知限制,类似于在 here 上讨论的发现.
但是,有一些方法可以通过重新考虑您的实现来解决这个问题,而不是成为一系列描述您希望操作的数据批处理的分派(dispatch)指令,类似于您创建 tmp
数据框。
不幸的是,这可能需要做更多的工作才能以这种方式重新思考您的逻辑,因为您会想将您的操作纯粹想象为提供给 PySpark 的一系列列操作命令,而不是逐行操作。有些操作不能仅使用 PySpark 调用来完成,因此这并不总是可行的。一般来说,值得仔细考虑。
例如,您的数据范围计算可以完全在 PySpark 中执行,如果您在多年或其他更大的规模上执行此操作,速度将会大大加快。我们没有使用 Python 列表理解或其他逻辑,而是对一小组初始数据使用列操作来构建我们的范围。
我在这里写了一些关于如何创建日期批处理的示例代码,这应该让您执行 join
来创建您的 tmp
DataFrame,之后您可以描述您希望对其执行的操作类型。
创建日期范围的代码(一年中每周的开始和结束日期):
from pyspark.sql import types as T, functions as F, SparkSession, Window
from datetime import date
spark = SparkSession.builder.getOrCreate()
year_marker_schema = T.StructType([
T.StructField("max_year", T.IntegerType(), False),
])
year_marker_data = [
{"max_year": 2022}
]
year_marker_df = spark.createDataFrame(year_marker_data, year_marker_schema)
year_marker_df.show()
"""
+--------+
|max_year|
+--------+
| 2022|
+--------+
"""
previous_week_window = Window.partitionBy(F.col("start_year")).orderBy("start_week_index")
year_marker_df = year_marker_df.select(
(F.col("max_year") - 1).alias("start_year"),
"*"
).select(
F.to_date(F.col("max_year").cast(T.StringType()), "yyyy").alias("max_year_date"),
F.to_date(F.col("start_year").cast(T.StringType()), "yyyy").alias("start_year_date"),
"*"
).select(
F.datediff(F.col("max_year_date"), F.col("start_year_date")).alias("days_between"),
"*"
).select(
F.floor(F.col("days_between") / 7).alias("weeks_between"),
"*"
).select(
F.sequence(F.lit(0), F.col("weeks_between")).alias("week_indices"),
"*"
).select(
F.explode(F.col("week_indices")).alias("start_week_index"),
"*"
).select(
F.lead(F.col("start_week_index"), 1).over(previous_week_window).alias("end_week_index"),
"*"
).select(
((F.col("start_week_index") * 7) + 1).alias("start_day"),
((F.col("end_week_index") * 7) + 1).alias("end_day"),
"*"
).select(
F.concat_ws(
"-",
F.col("start_year"),
F.col("start_day").cast(T.StringType())
).alias("start_day_string"),
F.concat_ws(
"-",
F.col("start_year"),
F.col("end_day").cast(T.StringType())
).alias("end_day_string"),
"*"
).select(
F.to_date(
F.col("start_day_string"),
"yyyy-D"
).alias("start_date"),
F.to_date(
F.col("end_day_string"),
"yyyy-D"
).alias("end_date"),
"*"
)
year_marker_df.drop(
"max_year",
"start_year",
"weeks_between",
"days_between",
"week_indices",
"max_year_date",
"start_day_string",
"end_day_string",
"start_day",
"end_day",
"start_week_index",
"end_week_index",
"start_year_date"
).show()
"""
+----------+----------+
|start_date| end_date|
+----------+----------+
|2021-01-01|2021-01-08|
|2021-01-08|2021-01-15|
|2021-01-15|2021-01-22|
|2021-01-22|2021-01-29|
|2021-01-29|2021-02-05|
|2021-02-05|2021-02-12|
|2021-02-12|2021-02-19|
|2021-02-19|2021-02-26|
|2021-02-26|2021-03-05|
|2021-03-05|2021-03-12|
|2021-03-12|2021-03-19|
|2021-03-19|2021-03-26|
|2021-03-26|2021-04-02|
|2021-04-02|2021-04-09|
|2021-04-09|2021-04-16|
|2021-04-16|2021-04-23|
|2021-04-23|2021-04-30|
|2021-04-30|2021-05-07|
|2021-05-07|2021-05-14|
|2021-05-14|2021-05-21|
+----------+----------+
only showing top 20 rows
"""
一旦您拥有此代码并且如果您无法单独通过联接/列派生来表达您的工作并且被迫使用union_many
执行操作,您可以考虑使用 Spark 的 localCheckpoint df2
结果中的特征。这将允许 Spark 简单地计算结果 DataFrame 而不是将其查询计划添加到您将推送到 df_total
的结果中。这可以与 cache 配对还将生成的 DataFrame 保存在内存中,但这将取决于您的数据规模。
localCheckpoint
和 cache
可用于避免多次重新计算相同的 DataFrame 并截断在中间 DataFrame 之上完成的查询计划量。
您可能会发现 localCheckpoint
和 cache
对您的 df
DataFrame 也很有用,因为它将在您的循环(假设您无法重新处理逻辑以使用基于 SQL 的操作,而是仍然被迫使用循环)。
作为何时使用每个的快速而粗略的总结:
在计算复杂且稍后将在操作中使用的 DataFrame 上使用 localCheckpoint
。通常,这些节点会馈入union
在以后要多次使用的 DataFrame 上使用 cache
。这通常是位于 for/while 循环之外的 DataFrame,将在循环中调用
你的初始代码
Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []
for date in Date_list:
tmp = df.filter(between [date, date-7days]).withColumn('example', F.lit(date))
........
df2 = df.join(tmp, 'column', 'inner').......
df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))
return df_total
现在应该是这样的:
# year_marker_df as derived in my code above
year_marker_df = year_marker_df.cache()
df = df.join(year_marker_df, df.my_date_column between year_marker_df.start_date, year_marker_df.end_date)
# Other work previously in your for_loop, resulting in df_total
return df_total
或者,如果您无法重新处理内部循环操作,您可以进行一些优化,例如:
Date_list = [All weeks from: '2021-01-01', to: '2022-01-01'] --> ~50 elements
df_total = spark.createDataframe([], schema)
df_date = []
df = df.cache()
for date in Date_list:
tmp = df.filter(between [date, date-7days]).withColumn('example', F.lit(date))
........
df2 = df.join(tmp, 'column', 'inner').......
df2 = df2.localCheckpoint()
df_date += [df2]
df_total = df_total.unionByName(union_many(*df_date))
return df_total
关于PySpark 序列化结果 OOM 对于 Spark 中的循环来说太大,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70814220/
我是 PHP 新手。我一直在脚本中使用 for 循环、while 循环、foreach 循环。我想知道 哪个性能更好? 选择循环的标准是什么? 当我们在另一个循环中循环时应该使用哪个? 我一直想知道要
我在高中的编程课上,我的作业是制作一个基本的小计和顶级计算器,但我在一家餐馆工作,所以制作一个只能让你在一种食物中读到。因此,我尝试让它能够接收多种食品并将它们添加到一个价格变量中。抱歉,如果某些代码
这是我正在学习的一本教科书。 var ingredients = ["eggs", "milk", "flour", "sugar", "baking soda", "baking powder",
我正在从字符串中提取数字并将其传递给函数。我想给它加 1,然后返回字符串,同时保留前导零。我可以使用 while 循环来完成此操作,但不能使用 for 循环。 for 循环只是跳过零。 var add
编辑:我已经在程序的输出中进行了编辑。 该程序要求估计给定值 mu。用户给出一个值 mu,同时还提供了四个不等于 1 的不同数字(称为 w、x、y、z)。然后,程序尝试使用 de Jaeger 公式找
我正在编写一个算法,该算法对一个整数数组从末尾到开头执行一个大循环,其中包含一个 if 条件。第一次条件为假时,循环可以终止。 因此,对于 for 循环,如果条件为假,它会继续迭代并进行简单的变量更改
现在我已经习惯了在内存非常有限的情况下进行编程,但我没有答案的一个问题是:哪个内存效率更高;- for(;;) 或 while() ?还是它们可以平等互换?如果有的话,还要对效率问题发表评论! 最佳答
这个问题已经有答案了: How do I compare strings in Java? (23 个回答) 已关闭 8 年前。 我正在尝试创建一个小程序,我可以在其中读取该程序的单词。如果单词有 6
这个问题在这里已经有了答案: python : list index out of range error while iteratively popping elements (12 个答案) 关
我正在尝试向用户请求 4 到 10 之间的整数。如果他们回答超出该范围,它将进入循环。当用户第一次正确输入数字时,它不会中断并继续执行 else 语句。如果用户在 else 语句中正确输入数字,它将正
我尝试创建一个带有嵌套 foreach 循环的列表。第一个循环是循环一些数字,第二个循环是循环日期。我想给一个日期写一个数字。所以还有另一个功能来检查它。但结果是数字多次写入日期。 Out 是这样的:
我想要做的事情是使用循环创建一个数组,然后在另一个类中调用该数组,这不会做,也可能永远不会做。解决这个问题最好的方法是什么?我已经寻找了所有解决方案,但它们无法编译。感谢您的帮助。 import ja
我尝试创建一个带有嵌套 foreach 循环的列表。第一个循环是循环一些数字,第二个循环是循环日期。我想给一个日期写一个数字。所以还有另一个功能来检查它。但结果是数字多次写入日期。 Out 是这样的:
我正在模拟一家快餐店三个多小时。这三个小时分为 18 个间隔,每个间隔 600 秒。每个间隔都会输出有关这 600 秒内发生的情况的统计信息。 我原来的结构是这样的: int i; for (i=0;
这个问题已经有答案了: IE8 for...in enumerator (3 个回答) How do I check if an object has a specific property in J
哪个对性能更好?这可能与其他编程语言不一致,所以如果它们不同,或者如果你能用你对特定语言的知识回答我的问题,请解释。 我将使用 c++ 作为示例,但我想知道它在 java、c 或任何其他主流语言中的工
这个问题不太可能帮助任何 future 的访问者;它只与一个小的地理区域、一个特定的时间点或一个非常狭窄的情况有关,这些情况并不普遍适用于互联网的全局受众。为了帮助使这个问题更广泛地适用,visit
我是 C 编程和编写代码的新手,以确定 M 测试用例的质因数分解。如果我一次只扫描一次,该功能本身就可以工作,但是当我尝试执行 M 次时却惨遭失败。 我不知道为什么 scanf() 循环有问题。 in
这个问题已经有答案了: JavaScript by reference vs. by value [duplicate] (4 个回答) 已关闭 3 年前。 我在使用 TSlint 时遇到问题,并且理
我尝试在下面的代码中添加 foreach 或 for 循环,以便为 Charts.js 创建多个数据集。这将允许我在此折线图上创建多条线。 我有一个 PHP 对象,我可以对其进行编码以稍后填充变量,但
我是一名优秀的程序员,十分优秀!