gpt4 book ai didi

apache-spark - Spark - 将字符串 ID 转换为唯一的整数 ID

转载 作者:行者123 更新时间:2023-12-04 03:58:51 26 4
gpt4 key购买 nike

我有一个看起来像这样的数据集,其中每个用户和产品 ID 都是一个字符串:

userA, productX
userA, productX
userB, productY

拥有约 280 万种产品和 3 亿用户;约 21 亿个用户-产品关联。

我的最终目标是在这个数据集上运行 Spark 协同过滤 (ALS)。由于用户和产品需要 int 键,我的第一步是为每个用户和产品分配一个唯一的 int,并转换上面的数据集,以便用户和产品用 int 表示。

这是我迄今为止尝试过的:
val rawInputData = sc.textFile(params.inputPath)
.filter { line => !(line contains "\\N") }
.map { line =>
val parts = line.split("\t")
(parts(0), parts(1)) // user, product
}

// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()

// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()

idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)

// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
case (id1, (id2s, idx1s)) =>
val idx1 = idx1s.head
id2s.map { (_, idx1)
}
}

// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
case (id2, (idx1s, idx2s)) =>
val idx2 = idx2s.head
idx1s.map{ (_, idx2)
}
}

// save output
val convertedInts = converted.map{
case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)

当我尝试在我的集群上运行它时(40 个执行器,每个执行器具有 5 GB RAM),它能够很好地生成 idx1map 和 idx2map 文件,但它会因内存不足错误而失败,并且在 cogroup 之后的第一个 flatMap 处获取失败。我之前没有对 Spark 做过很多事情,所以我想知道是否有更好的方法来实现这一点;我不太清楚这项工作中的哪些步骤会很昂贵。当然,cogroup 需要在整个网络中混洗整个数据集;但这样的事情是什么意思?
FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)

我不只是使用散列函数的原因是我最终希望在更大的数据集上运行它(大约 10 亿个产品、10 亿个用户、350 亿个关联)和 Int 键冲突的数量会变得相当大。在这种规模的数据集上运行 ALS 甚至接近可行吗?

最佳答案

我看起来您实际上是在收集所有用户列表,只是为了再次拆分它们。尝试只使用 join 而不是 cogroup,在我看来,这更像是你想要的。例如:

import org.apache.spark.SparkContext._
// Create some fake data
val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB")))
val userId = sc.parallelize(Seq(("userA",1),("userB",2)))
val productId = sc.parallelize(Seq(("productA",1),("productB",2)))

// Replace userName with ID's
val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)}
// Replace product names with ID's
val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)}

// Check results:
bothReplaced.collect()) // Array((1,1), (1,2), (2,2))

请对它的表现发表评论。

(我不知道 FetchFailed(...) 是什么意思)

关于apache-spark - Spark - 将字符串 ID 转换为唯一的整数 ID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28097333/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com