gpt4 book ai didi

apache-spark - Spark Word2VecModel 超过了用于保存的最大 RPC 大小

转载 作者:行者123 更新时间:2023-12-04 04:02:36 24 4
gpt4 key购买 nike

我正在训练一个 Word2Vec 模型,该模型在 200 个维度的基础上具有相当重要的单个术语(~ 100k)。

Spark 的典型 W2V 模型化目前加起来主要由每个单词的向量组成的内存使用量,即:numberOfDimensions*sizeof(float)*numberOfWords .算一下,上面的数量级是100MB,给或取。
考虑到我仍在研究我的标记器并且仍在为最佳向量大小进行测试,我实际上是在对 75k-150k 单词和 100 到 300 维的字典进行计算,所以假设模型可以达到 ~500MB。

现在一切都很好,直到保存这个模型。目前在 Spark 中以这种方式实现:

override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}

也就是说:创建了 1 行的数据框,该行包含所有向量的大 f(l)at 数组。数据框保存为 Parquet 。没关系...除非...您必须将其运送给执行人。您在集群模式下执行的操作。

这最终会破坏工作,堆栈跟踪如下:
16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s  
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure:
Serialized task 32:5 was 204136673 bytes,
which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes).
Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)

重现的简单代码(您不能在本地对其进行 spark-shell,但您需要将其发送到集群):
object TestW2V {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("TestW2V").getOrCreate()
import spark.implicits._

// Alphabet
val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray
val random = new java.util.Random()

// Dictionnary
def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray)
val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words
yield makeWord(random.nextInt(10)+5)

// Corpus (make it fairly non trivial)
def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length)))
val allWordsDummySentence = randomWords // all words at least once
val randomSentences = for (sentenceIndex <- 0 to 100000)
yield makeSentence(random.nextInt(10) +5)
val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences

// Train a W2V model on the corpus
val df = spark.createDataFrame(corpus.map(Tuple1.apply))
import org.apache.spark.ml.feature.Word2Vec
val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4)
val w2vModel = w2v.fit(df)
w2vModel.save("/home/Documents/w2v")

spark.stop
}
}

现在...我想我对内部结构的了解已经足够深入,可以理解为什么会发生这种情况。问题是:
  • 我做对了(我的 API 使用正确吗?)
  • 我怎么能解决它? spark.mllib.feature.Word2VecModel (“已弃用”基于 RDD 的 1.x 版本)有一个公共(public)构造函数,我可以通过滚动我自己的、正确分区的保存/加载实现来手动使用。但是新的spark.ml.feature.Word2VecModel不提供我可以看到的公共(public)构造函数。
  • 如果任何 Spark 贡献者以这种方式出现:这会被视为错误/可能的改进吗?

  • 考虑到 spark 团队修复了这个 JIRA: https://issues.apache.org/jira/browse/SPARK-11994 , (用于 1.x API),我猜他们确实对 2.0 API 进行了仔细检查,但我做错了:-)。

    要知道我想我可以在本地模式下运行它,并避免最终的任务序列化,但这充其量只是一个临时解决方案,这在生产级别是不可能的(数据可访问性和所有......)。或者将 RPC 大小破解为 512MB,当然...

    PS:上述情况发生在 Spark 2.0.1 和 spark 独立集群上(在本地模式下不可复制)。
    我通常会将此类消息发布到用户邮件列表,但看到 Spark encourages the use of SO , 开始...

    最佳答案

    我和你的经历完全一样。它在本地运行良好,但在集群模式下它死了,没有按照你的建议将 RPC 大小增加到 512mb。

    即通过spark.rpc.message.maxSize=512让我过去。

    我也同意保存的实现看起来很可疑,尤其是 repartition(1)少量。

    关于apache-spark - Spark Word2VecModel 超过了用于保存的最大 RPC 大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40842736/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com