gpt4 book ai didi

scala - DataFrame 到 RDD[(String, String)] 的转换

转载 作者:可可西里 更新时间:2023-11-01 11:06:08 24 4
gpt4 key购买 nike

我想将 org.apache.spark.sql.DataFrame 转换为 org.apache.spark.rdd.RDD[(String, String)] 在 Databricks 中。 谁能帮忙?

背景(也欢迎更好的解决方案):我有一个 Kafka 流,它(经过一些步骤后)变成了一个 2 列数据框。我想将其放入 Redis 缓存中,第一列作为键,第二列作为值。

更具体地说输入的类型是这样的:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]。我尝试如下放入Redis:

sc.toRedisKV(lastContacts)(redisConfig)

错误信息如下所示:

notebook:20: error: type mismatch;
found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

我已经尝试过一些想法(比如函数 .rdd)但没有任何帮助。

最佳答案

如果要将行映射到不同的 RDD 元素,可以使用 df.map(row => ...) 将数据帧转换为 RDD。

例如:

val df = Seq(("table1",432),
("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()

df.show()

+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+

val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

OR

val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]

请引用https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html关于 AnalysisException:必须使用 writeStream.start() 执行带有流源的查询

需要使用query等待查询终止。awaitTermination()防止进程在查询处于事件状态时退出。

关于scala - DataFrame 到 RDD[(String, String)] 的转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55335877/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com