gpt4 book ai didi

scala - org.apache.spark.ml.feature.Tokenizer 中的 NullPointerException

转载 作者:行者123 更新时间:2023-11-30 08:26:14 32 4
gpt4 key购买 nike

我想分别在 titledescription 字段上单独使用 TF-IDF 功能,然后在 VectorAssembler 中组合这些功能,以便最终的分类器可以对这些特征进行操作。

如果我使用一个简单的串行流,它就可以正常工作

titleTokenizer -> titleHashingTF -> VectorAssembler

但我需要像这样:

titleTokenizer       -> titleHashingTF 
-> VectorAssembler
descriptionTokenizer -> descriptionHashingTF

代码在这里:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StringIndexer, VectorAssembler}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.log4j.{Level, Logger}


object SimplePipeline {
def main(args: Array[String]) {

// setup boilerplate
val conf = new SparkConf()
.setAppName("Pipeline example")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.appName("Session for SimplePipeline")
.getOrCreate()

val all_df = spark.read.json("file:///Users/me/data.json")
val numLabels = all_df.count()

// split into training and testing
val Array(training, testing) = all_df.randomSplit(Array(0.75, 0.25))
val nTraining = training.count();
val nTesting = testing.count();

println(s"Loaded $nTraining training labels...");
println(s"Loaded $nTesting testing labels...");

// convert string labels to integers
val indexer = new StringIndexer()
.setInputCol("rating")
.setOutputCol("label")

// tokenize our string inputs
val titleTokenizer = new Tokenizer()
.setInputCol("title")
.setOutputCol("title_words")
val descriptionTokenizer = new Tokenizer()
.setInputCol("description")
.setOutputCol("description_words")

// count term frequencies
val titleHashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(titleTokenizer.getOutputCol)
.setOutputCol("title_tfs")
val descriptionHashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(descriptionTokenizer.getOutputCol)
.setOutputCol("description_tfs")

// combine features together
val assembler = new VectorAssembler()
.setInputCols(Array(titleHashingTF.getOutputCol, descriptionHashingTF.getOutputCol))
.setOutputCol("features")

// set params for our model
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)

// pipeline that combines all stages
val stages = Array(indexer, titleTokenizer, titleHashingTF, descriptionTokenizer, descriptionHashingTF, assembler, lr);
val pipeline = new Pipeline().setStages(stages);

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Make predictions.
val predictions = model.transform(testing)

// Select example rows to display.
predictions.select("label", "rawPrediction", "prediction").show()

sc.stop()
}
}

我的数据文件只是一个以换行符分隔的 JSON 对象文件:

{"title" : "xxxxxx", "description" : "yyyyy" .... }
{"title" : "zzzzzz", "description" : "zxzxzx" .... }

我得到的错误很长,难以理解,但重要的部分(我认为)是 java.lang.NullPointerException:

ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 12)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
... 23 more

我应该如何正确地设计我的管道来做到这一点?

(而且我对 Scala 完全陌生)

最佳答案

这里的问题是您没有验证数据并且某些值为NULL。重现这个非常容易:

val df = Seq((1, Some("abcd bcde cdef")), (2, None)).toDF("id", "description")

val tokenizer = new Tokenizer().setInputCol("description")
tokenizer.transform(df).foreach(_ => ())
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array<string>)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
...
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
...

例如,您可以删除:

tokenizer.transform(df.na.drop(Array("description")))

或将其替换为空字符串:

tokenizer.transform(df.na.fill(Map("description" -> "")))

无论哪个对您的应用程序更有意义。

关于scala - org.apache.spark.ml.feature.Tokenizer 中的 NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41657152/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com