- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我刚刚开始尝试 pyspark/spark 并遇到我的代码无法正常工作的问题。我找不到问题,spark 的错误输出也不是很有帮助。我确实在 stackoverflow 上找到了一些相同的问题,但没有一个有明确的答案或解决方案(至少对我来说不是)。
我试图运行的代码是:
import json
from datetime import datetime, timedelta
from pyspark.sql.session import SparkSession
from parse.data_reader import read_csv
from parse.interpolate import insert_time_range, create_time_range, linear_interpolate
spark = SparkSession.builder.getOrCreate()
df = None
with open('config/data_sources.json') as sources_file:
sources = json.load(sources_file)
for file in sources['files']:
with open('config/mappings/{}.json'.format(file['mapping'])) as mapping:
df_to_append = read_csv(
spark=spark,
file='{}{}'.format(sources['root_path'], file['name']),
config=json.load(mapping)
)
if df is None:
df = df_to_append
else:
df = df.union(df_to_append)
df.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
time_range = create_time_range(
datetime(year=2019, month=7, day=1, hour=0),
datetime(year=2019, month=7, day=8, hour=0),
timedelta(seconds=3600)
)
df_with_intervals = insert_time_range(
df=df,
timestamp_column_name='Timestamp',
variable_column_name='Variable',
value_column_name='Value',
time_range=time_range,
)
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================> (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========> (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp |Variable |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0 |
|2019-07-01 00:00:06.664|Load % SB DG|0.0 |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows
Traceback (most recent call last):
File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
print(self._jdf.showString(n, int(truncate), vertical))
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
Process finished with exit code 1
def create_time_range(start_time: datetime, end_time: datetime, step_size: timedelta) -> Iterable[datetime]:
return [start_time + step_size * n for n in range(int((end_time - start_time) / step_size))]
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
time_range = array([lit(ts) for ts in time_range])
df_exploded = df \
.drop(value_column_name) \
.drop(timestamp_column_name) \
.distinct() \
.withColumn(value_column_name, lit(None)) \
.withColumn(timestamp_column_name, explode(time_range))
return df.union(df_exploded.select([timestamp_column_name, variable_column_name, value_column_name]))
data_sources.json
文件当前仅包含一个 csv 文件,即几 MB。是什么导致了 OutOfMemoryException 或如何获得更详细的错误报告?
insert_time_range
到:
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
time_range = array([lit(ts) for ts in time_range])
df_exploded = df \
.drop(value_column_name) \
.drop(timestamp_column_name) \
.distinct() \
.withColumn(value_column_name, lit(None)) \
.withColumn(timestamp_column_name, lit(time_range[0]))
return df_exploded.select([timestamp_column_name, variable_column_name, value_column_name])
.show()
打电话我加了一行
print(df_with_intervals.count())
输出数字 5(如预期)。但是当我尝试
show()
我得到的值相同
OutOfMemoryException
.
insert_time_range
根据评论中的建议方法:
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
schema = StructType(
[
StructField(timestamp_column_name, TimestampType(), True),
StructField(value_column_name, DoubleType(), True)
]
)
df_time_range = df.sql_ctx.createDataFrame(
[(timestamp, None) for timestamp in time_range],
schema=schema
)
df_time_range = df.select([variable_column_name]).distinct().crossJoin(df_time_range).select(
[timestamp_column_name, variable_column_name, value_column_name]
)
df_time_range.show(n=20, truncate=False)
return df.union(df_time_range)
C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/09 23:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/09 23:00:30 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 44:==================================> (3 + 2) / 5]19/09/09 23:00:43 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
[Stage 45:===========> (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp |Variable |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0 |
|2019-07-01 00:00:06.664|Load % SB DG|0.0 |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
+-------------------+------------+-----+
|Timestamp |Variable |Value|
+-------------------+------------+-----+
|2019-06-30 22:00:00|Load % PS DG|null |
|2019-06-30 22:00:00|Power PS DG |null |
|2019-06-30 22:00:00|Power Shore |null |
|2019-06-30 22:00:00|Load % SB DG|null |
|2019-06-30 22:00:00|Power SB DG |null |
|2019-06-30 22:01:00|Load % PS DG|null |
|2019-06-30 22:01:00|Power PS DG |null |
|2019-06-30 22:01:00|Power Shore |null |
|2019-06-30 22:01:00|Load % SB DG|null |
|2019-06-30 22:01:00|Power SB DG |null |
|2019-06-30 22:02:00|Load % PS DG|null |
|2019-06-30 22:02:00|Power PS DG |null |
|2019-06-30 22:02:00|Power Shore |null |
|2019-06-30 22:02:00|Load % SB DG|null |
|2019-06-30 22:02:00|Power SB DG |null |
|2019-06-30 22:03:00|Load % PS DG|null |
|2019-06-30 22:03:00|Power PS DG |null |
|2019-06-30 22:03:00|Power Shore |null |
|2019-06-30 22:03:00|Load % SB DG|null |
|2019-06-30 22:03:00|Power SB DG |null |
+-------------------+------------+-----+
only showing top 20 rows
Traceback (most recent call last):
File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 46, in <module>
df_with_intervals.sort([timestamp_column_name, variable_column_name]).show(n=5, truncate=False)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
print(self._jdf.showString(n, int(truncate), vertical))
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o333.showString.
: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
at com.trueaccord.scalapb.textformat.TextGenerator.add(TextGenerator.scala:19)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:33)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
Process finished with exit code 1
union
方法,但我不知道问题是什么?
config/data_sources.json
中只有一个 CSV 文件。所以
df = df.union(df_to_append)
线从未被执行。现在我在
config/data_sources.json
中添加了多个 CSV 文件然后是
union
方法被执行,我再次得到
py4j.protocol.Py4JJavaError: An error occurred while calling o2043.showString.
: java.lang.OutOfMemoryError: Java heap space
错误,但它已经发生在第一个
union
.我用这个方法做错了什么,或者方法本身有错误?
最佳答案
它可能来自 explode
你在做什么。您基本上将从 json 文件生成的所有行与 time_range
中的日期时间进行交叉连接。 ,有 168 个元素。
我会更换 explode
与 F.lit()
首先看它是否运行。如果还有问题,我会删除union
代码试试。
关于apache-spark - PySpark 数据帧操作导致 OutOfMemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57787724/
我正在努力做到这一点 在我的操作中从数据库获取对象列表(确定) 在 JSP 上打印(确定) 此列表作为 JSP 中的可编辑表出现。我想修改然后将其提交回同一操作以将其保存在我的数据库中(失败。当我使用
我有以下形式的 Linq to Entities 查询: var x = from a in SomeData where ... some conditions ... select
我有以下查询。 var query = Repository.Query() .Where(p => !p.IsDeleted && p.Article.ArticleSections.Cou
我正在编写一个应用程序包,其中包含一个主类,其中主方法与GUI类分开,GUI类包含一个带有jtabbedpane的jframe,它有两个选项卡,第一个选项卡包含一个jtable,称为jtable1,第
以下代码产生错误 The nested query is not supported. Operation1='Case' Operation2='Collect' 问题是我做错了什么?我该如何解决?
我已经为 HA redis 集群(2 个副本、1 个主节点、3 个哨兵)设置了本地 docker 环境。只有哨兵暴露端口(10021、10022、10023)。 我使用的是 stackexchange
我正在 Desk.com 中构建一个“集成 URL”,它使用 Shopify Liquid 模板过滤器语法。对于开始日期为 7 天前而结束日期为现在的查询,此 URL 需要包含“开始日期”和“结束日期
你一定想过。然而情况却不理想,python中只能使用类似于 i++/i--等操作。 python中的自增操作 下面代码几乎是所有程序员在python中进行自增(减)操作的常用
我需要在每个使用 github 操作的手动构建中显示分支。例如:https://gyazo.com/2131bf83b0df1e2157480e5be842d4fb 我应该显示分支而不是一个。 最佳答
我有一个关于 Perl qr 运算符的问题: #!/usr/bin/perl -w &mysplit("a:b:c", /:/); sub mysplit { my($str, $patt
我已经使用 ArgoUML 创建了一个 ERD(实体关系图),我希望在一个类中创建两个操作,它们都具有 void 返回类型。但是,我只能创建一个返回 void 类型的操作。 例如: 我能够将 book
Github 操作仍处于测试阶段并且很新,但我希望有人可以提供帮助。我认为可以在主分支和拉取请求上运行 github 操作,如下所示: on: pull_request push: b
我正在尝试创建一个 Twilio 工作流来调用电话并记录用户所说的内容。为此,我正在使用 Record,但我不确定要在 action 参数中放置什么。 尽管我知道 Twilio 会发送有关调用该 UR
我不确定这是否可行,但值得一试。我正在使用模板缓冲区来减少使用此算法的延迟渲染器中光体积的过度绘制(当相机位于体积之外时): 使用廉价的着色器,将深度测试设置为 LEQUAL 绘制背面,将它们标记在模
有没有聪明的方法来复制 和 重命名 文件通过 GitHub 操作? 我想将一些自述文件复制到 /docs文件夹(:= 同一个 repo,不是远程的!),它们将根据它们的 frontmatter 重命名
我有一个 .csv 文件,其中第一列包含用户名。它们采用 FirstName LastName 的形式。我想获取 FirstName 并将 LastName 的第一个字符添加到它上面,然后删除空格。然
Sitecore 根据 Sitecore 树中定义的项目名称生成 URL, http://samplewebsite/Pages/Sample Page 但我们的客户有兴趣降低所有 URL(页面/示例
我正在尝试进行一些计算,但是一旦我输入金额,它就会完成。我只是希望通过单击按钮而不是自动发生这种情况。 到目前为止我做了什么: Angular JS - programming-fr
我的公司创建了一种在环境之间移动文件的复杂方法,现在我们希望将某些构建的 JS 文件(已转换和缩小)从一个 github 存储库移动到另一个。使用 github 操作可以实现这一点吗? 最佳答案 最简
在我的代码中,我创建了一个 JSONArray 对象。并向 JSONArray 对象添加了两个 JSONObject。我使用的是 json-simple-1.1.jar。我的代码是 package j
我是一名优秀的程序员,十分优秀!