gpt4 book ai didi

scala - 基于两列的spark join操作

转载 作者:行者123 更新时间:2023-12-02 21:27:24 30 4
gpt4 key购买 nike

我正在尝试连接基于两列的两个数据集。它一直有效,直到我使用一列,但失败并出现以下错误

:29:错误:值连接不是 org.apache.spark.rdd.RDD[(String, String, (String, String, String, String, Double))] 的成员 val FinalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount ) }

代码:

    import org.apache.spark.rdd.RDD

def zipWithIndex[T](rdd: RDD[T]) = {
val partitionSizes = rdd.mapPartitions(p => Iterator(p.length)).collect

val ranges = partitionSizes.foldLeft(List((0, 0))) { case(accList, count) =>
val start = accList.head._2
val end = start + count
(start, end) :: accList
}.reverse.tail.toArray

rdd.mapPartitionsWithIndex( (index, partition) => {
val start = ranges(index)._1
val end = ranges(index)._2
val indexes = Iterator.range(start, end)
partition.zip(indexes)
})
}

val dimension = sc.
textFile("dimension.txt").
map{ line =>
val parts = line.split("\t")
(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5))
}

val dimensionWithSK =
zipWithIndex(dimension).map { case((nk1,nk2,prop3,prop4,prop5,prop6), idx) => (nk1,nk2,(prop3,prop4,prop5,prop6,idx + nextSurrogateKey)) }

val fact = sc.
textFile("fact.txt").
map { line =>
val parts = line.split("\t")
// we need to output (Naturalkey, (FactId, Amount)) in
// order to be able to join with the dimension data.
(parts(0),parts(1), (parts(2),parts(3), parts(4),parts(5),parts(6).toDouble))
}

val finalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount) }

在这里请求别人的帮助..谢谢斯里达

最佳答案

如果您查看 join 的签名,它适用于成对的 RDD:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

你有一个三元组。我猜您尝试加入元组的前 2 个元素,因此您需要将三元组映射到一对,其中该对的第一个元素是包含三元组的前两个元素的对,例如对于任何类型 V1V2

val left: RDD[(String, String, V1)] = ??? // some rdd

val right: RDD[(String, String, V2)] = ??? // some rdd

left.map {
case (key1, key2, value) => ((key1, key2), value)
}
.join(
right.map {
case (key1, key2, value) => ((key1, key2), value)
})

这将为您提供一个形式为 RDD[(String, String), (V1, V2)]

的 RDD

关于scala - 基于两列的spark join操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23191363/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com