gpt4 book ai didi

scala - Apache Spark 通过传递值进行映射和缩减

转载 作者:行者123 更新时间:2023-12-02 21:21:52 25 4
gpt4 key购买 nike

我对从 Cassandra 加载的 RDD 有一个简单的 mapreduce 作业。代码看起来像这样

sc.cassandraTable("app","channels").select("id").toArray.foreach((o) => {

val orders = sc.cassandraTable("fam", "table")
.select("date", "f2", "f3", "f4")
.where("id = ?", o("id")) # This o("id") is the ID i want later append to the finished list

val month = orders
.map( oo => {
var total_revenue = List(oo.getIntOption("f2"), oo.getIntOption("f3"), oo.getIntOption("f4")).flatten.reduce(_ + _)
(getDateAs("hour", oo.getDate("date")), total_revenue)
})
.reduceByKey(_ + _)
})

因此这段代码将收入相加并返回类似这样的内容

(2014-11-23 18:00:00, 12412)
(2014-11-23 19:00:00, 12511)

现在我想将其保存回 Cassandra 表 revenue_hour 但我需要该列表中的 ID,类似这样。

(2014-11-23 18:00:00, 12412, "CH1")
(2014-11-23 19:00:00, 12511, "CH1")

如何才能使用更多的(键,值)列表来完成这项工作?我怎样才能传递更多的值,这些值不应该被转换,而只是传递到最后,以便我可以将其保存回 Cassandra?

最佳答案

也许您可以使用一个类并通过流程使用它。我的意思是,定义 RevenueHour 类

case class RevenueHour(date: java.util.Date,revenue: Long, id: String)

然后在映射阶段构建一个中间 RevenueHour,然后在缩减阶段构建另一个中间 RevenueHour。

val map: RDD[(Date, RevenueHour)] = orders.map(row => 
(
getDateAs("hour", oo.getDate("date")),
RevenueHour(
row.getDate("date"),
List(row.getIntOption("f2"),row.getIntOption("f3"),row.getIntOption("f4")).flatten.reduce(_ + _),
row.getString("id")
)
)
).reduceByKey((o1: RevenueHour, o2: RevenueHour) => RevenueHour(getDateAs("hour", o1.date), o1.revenue + o2.revenue, o1.id))

我使用 o1 RevenueHour 因为 o1 和 o2 将具有相同的 key 和相同的 id(因为之前的 where 子句)。

希望有帮助。

关于scala - Apache Spark 通过传递值进行映射和缩减,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27092475/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com