gpt4 book ai didi

java - 如何将RDD的数据提取到Java ArrayList中?

转载 作者:行者123 更新时间:2023-12-02 04:18:51 25 4
gpt4 key购买 nike

显而易见的想法是添加元素。

ArrayList<String> myvalues = new ArrayList<String>();

myRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
myvalues.add(row.getString(0); // Say I need only first element
}
});

此方法以及其他替代方案已抛出org.apache.spark.SparkException:任务不可序列化。我进一步简化了功能..显​​然我在做一些不合逻辑的事情:-

LOG.info("Let's see..");
queryRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
LOG.info("Value is : "+row.getString(0));
}
});

必须有一个简单的方法。这是供引用的堆栈跟踪:

2015-10-08 10:16:48 INFO  UpdateStatementTemplateImpl:141 - Lets see.. 
2015-10-08 10:16:48 WARN GenericExceptionMapper:20 - Error while executing service
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1476)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:781)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:313)
at org.apache.spark.sql.api.java.JavaSchemaRDD.foreach(JavaSchemaRDD.scala:42)
at com.simility.cassandra.template.DeviceIDTemplateImpl.test(DeviceIDTemplateImpl.java:144)
at com.kumbay.service.admin.BusinessEntityService.testSignal(BusinessEntityService.java:1801)
at com.kumbay.service.admin.BusinessEntityService$$FastClassByCGLIB$$157ddd50.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:701)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:96)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:64)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:634)

最佳答案

我假设LOGmyvalues存在于一个包含类中。因此,整个类(作为调用“捕获”的一部分将被序列化,这是不可能的。

解决方案

首先,用简单的 System.out.println 替换 LOG,看看是否有效。

其次,创建您在通话中使用的成员的副本;

public void call(...) {
Log log = LOG // or
ArrayList<String> inside = myvalues
inside.add(...)
}

第三,永远不要在foreach中使用ArrayList,因为它在不同的节点上运行,每个节点都会看到自己的ArrayList。所以,你永远不会得到你所期望的。

相反,请使用rdd.collect(...)来收集您的结果!

关于java - 如何将RDD的数据提取到Java ArrayList中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33012918/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com