- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个带有 MapType 字段的 Dataframe。
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> fields = StructType([
... StructField('timestamp', TimestampType(), True),
... StructField('other_field', StringType(), True),
... StructField('payload', MapType(
... keyType=StringType(),
... valueType=StringType()),
... True), ])
>>> import datetime
>>> rdd = sc.parallelize([[datetime.datetime.now(), 'this should be in', {'akey': 'aValue'}]])
>>> df = rdd.toDF(fields)
>>> df.show()
+--------------------+-----------------+-------------------+
| timestamp| other_field| payload|
+--------------------+-----------------+-------------------+
|2018-01-10 12:56:...|this should be in|Map(akey -> aValue)|
+--------------------+-----------------+-------------------+
我想将 other_field
添加为 payload
字段中的键。
我知道我可以使用 udf:
>>> def _add_to_map(name, value, map_field):
... map_field[name] = value
... return map_field
...
>>> add_to_map = udf(_add_to_map, MapType(StringType(),StringType()))
>>> df.select(add_to_map(lit('other_field'), 'other_field', 'payload')).show(1, False)
+------------------------------------------------------+
|PythonUDF#_add_to_map(other_field,other_field,payload)|
+------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue) |
+------------------------------------------------------+
有没有没有 udf
的方法?
最佳答案
如果您提前知道 key ,这里有一种不使用 udf
的方法。使用 create_map
函数。至于这是否更高效,我不知道。
from pyspark.sql.functions import col, lit, create_map
df.select(
create_map(
lit('other_field'),
col('other_field'),
lit('akey'),
col('payload')['akey']
)
).show(n=1, truncate=False)
输出:
+-----------------------------------------------------+
|map(other_field, other_field, akey, payload['akey']) |
+-----------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue)|
+-----------------------------------------------------+
这是一种无需对字典键进行硬编码的方法。不幸的是,它涉及一个 collect()
操作。
首先,让我们修改您的原始数据框,在 MapType()
字段中再包含一个键值对。
from pyspark.sql.functions import col, lit, create_map
import datetime
rdd = sc.parallelize(
[
[
datetime.datetime.now(),
'this should be in',
{'akey': 'aValue', 'bkey': 'bValue'}
]
]
)
df = rdd.toDF(fields)
df.show(n=1, truncate=False)
创建以下内容:
+--------------------------+-----------------+-----------------------------------+
|timestamp |other_field |payload |
+--------------------------+-----------------+-----------------------------------+
|2018-01-10 17:37:58.859603|this should be in|Map(bkey -> bValue, akey -> aValue)|
+--------------------------+-----------------+-----------------------------------+
使用 explode()
和 collect()
,您可以这样获取 key :
from pyspark.sql.functions import explode
keys = [
x['key'] for x in (df.select(explode("payload"))
.select("key")
.distinct()
.collect())
]
现在像上面那样使用create_map
,但是使用来自keys
的信息来动态创建键值对。我使用 reduce(add, ...)
因为 create_map
期望输入按顺序是键值对 - 我想不出另一种方法来展平列表.
from operator import add
df.select(
create_map
(
*([lit('other_field'), col('other_field')] + reduce(add, [[lit(k), col('payload').getItem(k)] for k in keys]))
)
).show(n=1, truncate=False)
+---------------------------------------------------------------------------+
|map(other_field, other_field, akey, payload['akey'], bkey, payload['bkey'])|
+---------------------------------------------------------------------------+
|Map(other_field -> this should be in, akey -> aValue, bkey -> bValue) |
+---------------------------------------------------------------------------+
关于python - 将新的键/值对添加到 Spark MapType 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196203/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!