gpt4 book ai didi

java - 如何使用另一种类方法在 Java 中使用 SPARK 中的映射函数

转载 作者:行者123 更新时间:2023-11-30 07:56:59 25 4
gpt4 key购买 nike

    public class RDDExample {
public static void main(String[] args){
final JavaSparkContext sc = SparkSingleton.getContext();
Lemmatizer lemmatizer = new Lemmatizer();
List<String> dirtyTwits = Arrays.asList(
"Shipment of gold arrived in a truck",
"Delivery of silver arrived in a silver truck",
"Shipment of gold damaged in a fire"
//итд, дофантазируйте дальше сами :)
);
JavaRDD<String> twitsRDD = sc.parallelize(dirtyTwits);

JavaRDD<List<String>> lemmatizedTwits = twitsRDD.map(new Function<String, List<String>>() {
@Override
public List<String> call(String s) throws Exception {
return lemmatizer.Execute(s);//return List<String>
}
});
System.out.println(lemmatizedTwits.collect());
}
}

我编写代码,但在运行时我有异常线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化。我在谷歌中搜索它,但没有找到我需要的 Java 解决方案。Scala 的无处不在的代码或简单的操作“return s+”qwer“”。我在哪里可以阅读如何使用 .map 中其他类的方法?或者谁能​​告诉我它是如何工作的?对不起我的英语不好。完整追溯

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at RDDExample.main(RDDExample.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: preprocessor.coreNlp.Lemmatizer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 11 more

完整日志

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/01/15 00:45:49 INFO SecurityManager: Changing view acls to: ntsfk
17/01/15 00:45:49 INFO SecurityManager: Changing modify acls to: ntsfk
17/01/15 00:45:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ntsfk); users with modify permissions: Set(ntsfk)
17/01/15 00:45:50 INFO Slf4jLogger: Slf4jLogger started
17/01/15 00:45:50 INFO Remoting: Starting remoting
17/01/15 00:45:51 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:64122]
17/01/15 00:45:51 INFO Utils: Successfully started service 'sparkDriver' on port 64122.
17/01/15 00:45:51 INFO SparkEnv: Registering MapOutputTracker
17/01/15 00:45:51 INFO SparkEnv: Registering BlockManagerMaster
17/01/15 00:45:51 INFO DiskBlockManager: Created local directory at F:\Local\Temp\spark-local-20170115004551-eaac
17/01/15 00:45:51 INFO MemoryStore: MemoryStore started with capacity 491.7 MB
17/01/15 00:45:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/15 00:45:53 INFO HttpFileServer: HTTP File server directory is F:\Local\Temp\spark-e041cd0f-83b9-46fa-b5d0-4fce800a2778
17/01/15 00:45:53 INFO HttpServer: Starting HTTP Server
17/01/15 00:45:53 INFO Utils: Successfully started service 'HTTP file server' on port 64123.
17/01/15 00:45:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/01/15 00:45:53 INFO SparkUI: Started SparkUI at http://DESKTOP-B29B6NA:4040
17/01/15 00:45:54 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:64122/user/HeartbeatReceiver
17/01/15 00:45:55 INFO NettyBlockTransferService: Server created on 64134
17/01/15 00:45:55 INFO BlockManagerMaster: Trying to register BlockManager
17/01/15 00:45:55 INFO BlockManagerMasterActor: Registering block manager localhost:64134 with 491.7 MB RAM, BlockManagerId(<driver>, localhost, 64134)
17/01/15 00:45:55 INFO BlockManagerMaster: Registered BlockManager
17/01/15 00:45:55 INFO StanfordCoreNLP: Adding annotator tokenize
17/01/15 00:45:55 INFO TokenizerAnnotator: TokenizerAnnotator: No tokenizer type provided. Defaulting to PTBTokenizer.
17/01/15 00:45:55 INFO StanfordCoreNLP: Adding annotator ssplit
17/01/15 00:45:55 INFO StanfordCoreNLP: Adding annotator pos
Reading POS tagger model from edu/stanford/nlp/models/pos-tagger/english-left3words/english-left3words-distsim.tagger ... done [3,5 sec].
17/01/15 00:45:59 INFO StanfordCoreNLP: Adding annotator lemma

在我有异常(exception)之后。

环境Java 1.8,Spark 2.10

最佳答案

通常选择的第一种方法是使 Lemmatizer Serializable 但您必须记住,序列化并不是这里唯一可能的问题。 Spark 执行器严重依赖多线程,闭包中的任何对象都应该是线程安全的。

如果同时满足两个条件(可串行化和线程安全),另一种解决方案是为每个执行程序线程创建单独的实例,例如使用 mapPartitions。一个简单的解决方案(通常最好避免收集整个分区)如下所示:

twitsRDD.mapPartitions(iter -> {
Lemmatizer lemmatizer = new Lemmatizer();
List<List<String>> lemmas = new LinkedList<>();

while (iter.hasNext()) {
lemmas.add(lemmatizer.Execute(iter.next()));
}

return lemmas.iterator();
});

这应该可以解决序列化问题并解决一些(但不是全部)线程安全问题。由于最新版本的 CoreNLP 声称是线程安全的,因此它在您的情况下应该足够好。

关于java - 如何使用另一种类方法在 Java 中使用 SPARK 中的映射函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41655502/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com