- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 Spark 1.3.1 并尝试使用 mongo-hadoop connector 将 RDD 保存到 mongodb版本 1.3.2 和 mongo-java-driver 版本 3.0.1。当我在独立集群上运行下面的应用程序时,驱动程序被标记为失败。
这是我用来重现问题的代码,
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.bson.BasicBSONObject
import org.bson.BSONObject
object TestApp {
def testSaveRddToMongo() {
val sparkConf = new SparkConf().setAppName("Test")
val sc = new SparkContext(sparkConf)
val mongoConfig = new Configuration()
mongoConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat")
mongoConfig.set("mongo.input.uri", "mongodb://some.local.ip:27017/mydb.input")
val bsonRDD: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(mongoConfig, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
val reasons: RDD[String] = bsonRDD.map( tuple => {
tuple._2.asInstanceOf[BasicBSONObject].getString("fieldName").trim
}).distinct().cache()
val out: RDD[(String,Int)] = reasons.zipWithIndex().map { case (k,v) => (k,v.toInt)}
println (s"Saving ${out.count} elements")
val outputConfig = new Configuration()
outputConfig.set("mongo.job.output.format","com.mongodb.hadoop.MongoOutputFormat")
outputConfig.set("mongo.output.uri", "mongodb://some.local.ip:27017/mydb.garbage")
out.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], outputConfig)
}
def main(args: Array[String]) {
testSaveRddToMongo()
}
}
在驱动程序的 stderr 中,我看到了这个
15/05/15 14:18:43 INFO DAGScheduler: Job 2 failed: saveAsNewAPIHadoopFile at Test.scala:39, took 6.491961 s
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:59)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 5.0 failed 4 times, most recent failure: Lost task 3.3 in stage 5.0 (TID 275, largo-ubuntu):
java.lang.IllegalStateException: The pool is closed
at com.mongodb.internal.connection.ConcurrentPool.get(ConcurrentPool.java:123)
at com.mongodb.connection.DefaultConnectionPool.getPooledConnection(DefaultConnectionPool.java:243)
at com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:90)
at com.mongodb.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:80)
at com.mongodb.connection.DefaultServer.getConnection(DefaultServer.java:69)
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:86)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:184)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:177)
at com.mongodb.operation.BaseWriteOperation.execute(BaseWriteOperation.java:106)
at com.mongodb.operation.BaseWriteOperation.execute(BaseWriteOperation.java:58)
at com.mongodb.Mongo.execute(Mongo.java:745)
at com.mongodb.Mongo$2.execute(Mongo.java:728)
at com.mongodb.DBCollection.executeWriteOperation(DBCollection.java:327)
at com.mongodb.DBCollection.replaceOrInsert(DBCollection.java:405)
at com.mongodb.DBCollection.save(DBCollection.java:394)
at com.mongodb.DBCollection.save(DBCollection.java:367)
at com.mongodb.hadoop.output.MongoRecordWriter.write(MongoRecordWriter.java:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
为什么连接被关闭了?其他地方是否有我没有看到的异常(exception)情况?
修复
按照下面的 maasg,使用 casbah 编写结果是可行的。我更新了代码如下,
import com.mongodb.casbah.Imports._
...
println (s"Saving ${out.count} elements")
val uri = MongoClientURI("mongodb://some.local.ip:27017/mydb.garbage")
val mongoClient = MongoClient(uri)
val collection = mongoClient(uri.database.get)(uri.collection.get)
collection.drop()
val builder = collection.initializeUnorderedBulkOperation
for ((value, index) <- out.collect()) { builder.insert(MongoDBObject(("_id" -> value), ("value" -> index))) }
builder.execute()
更好的修复
这是一个更好的版本,它会为每个分区做一批写入
...
def dropCollection(uriString: String) {
val uri = MongoClientURI(uriString)
val mongoClient = MongoClient(uri)
val collection = mongoClient(uri.database.get)(uri.collection.get)
mongoClient.close()
}
def saveReultsToMongo(out: RDD[(String,Int)], uriString: String) {
out.foreachPartition( itr => {
val uri = MongoClientURI(uriString)
val mongoClient = MongoClient(uri)
val collection = mongoClient(uri.database.get)(uri.collection.get)
val builder = collection.initializeUnorderedBulkOperation
for ( (value, index) <- itr ){ builder.insert(MongoDBObject(("_id" -> value), ("value" -> index))) }
builder.execute
mongoClient.close
})
}
...
println (s"Saving ${out.count} elements")
dropCollection("mongodb://10.22.128.84:27017/Minerva.garbage")
saveReultsToMongo(out, "mongodb://10.22.128.84:27017/Minerva.garbage")
一些注意事项,
out.foreach{ case (value, index) => builder.insert(MongoDBObject(("_id"-> value), ("value"-> index))) }
不工作是因为 BulkWriteOperation 不可序列化
out.foreachPartition
可以按照 maasg 和 BETTER FIX 使用最佳答案
在 1.4 版发布之前,hadoop-mongo 连接器无法可靠地与 Spark 配合使用。高并行负载会泄漏客户端连接,导致失败。在我们的例子中,这个错误是关键点:https://jira.mongodb.org/browse/HADOOP-143如您所见,它已合并到 1.4 版本中。
作为解决方法,我可以推荐使用 casbah client + 批处理操作(围绕 Java 客户端的 Scala 包装器)。
关于mongodb - RDD 仅部分写入 mongo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30267050/
我有这个代码 var myChart = new FusionCharts("../themes/clean/charts/hbullet.swf", "myChartId", "400", "75
既然写入是立即进行的(复制到内核缓冲区并返回),那么使用 io_submit 进行写入有什么好处? 事实上,它 (aio/io_submit) 看起来更糟,因为您必须在堆上分配写入缓冲区并且不能使用基
我正在使用 mootool 的 Request.JSON 从 Twitter 检索推文。收到它后,我将写入目标 div 的 .innerHTML 属性。当我在本地将其作为文件进行测试时,即 file:
最终,我想将 Vertica DB 中的数据抓取到 Spark 中,训练机器学习模型,进行预测,并将这些预测存储到另一个 Vertica DB 中。 当前的问题是确定流程最后部分的瓶颈:将 Spark
我使用 WEKA 库编写了一个 Java 程序, 训练分类算法 使用经过训练的算法对未标记的数据集运行预测 将结果写入 .csv 文件 问题在于它当前写出离散分类结果(即算法猜测一行属于哪个类别)。我
背景 - 我正在考虑使用 clickonce 通过 clickonce(通过网站)部署 WinForms 应用程序。相对简单的应用程序的要素是: - 它是一个可执行文件和一个数据库文件(sqlite)
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
我正在读取 JSON 文件,取出值并进行一些更改。 基本上我向数组添加了一些值。之后我想将其写回到文件中。当我将 JSONArray 写回文件时,会被写入字符串而不是 JSONArray 对象。怎样才
我为两个应用程序使用嵌入式数据库,其中一个是服务器,另一个是客户端。客户端应用程序。可以向服务器端发送获取数据请求以检索数据并显示在表格(或其他)中。问题是这样的:如何将获取的数据保存(写入)到页面文
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
从问题得出问题:找到所有 result = new ArrayList(); for (int i = 2; i >(i%8) & 0x1) == 0) { result.add(i
由于某种原因,它没有写入 CSV。谁能明白为什么它不写吗? def main(): list_of_emails = read_email_csv() #read input file, cr
关闭。 这个问题是 not reproducible or was caused by typos 。它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能在这里出现,
我目前正在开发一个保存和加载程序,但我无法获得正确的结果。 编写程序: #include #include #define FILENAME "Save" #define COUNT 6 type
import java.io.*; public class Main2 { public static void main(String[] args) throws Exception {
我需要使用预定义位置字符串“Office”从所有日历中检索所有 iOS 事件,然后将结果写入 NSLog 和 UITextView。 到目前为止,这是我的代码: #import "ViewCo
我正在尝试将 BOOL 值写入 PFInstallation 中的列,但会不停地崩溃: - (IBAction)pushSwitch:(id)sender { NSUserDefaults *push
我以前在学校学过一些简单的数据库编程,但现在我正在尝试学习最佳实践,因为我正在编写更复杂的应用程序。写入 MySQL 数据库并不难,但我想知道让分布式应用程序写入 Amazon EC2 上的远程数据库
是否可以写回到ResourceBundle?目前我正在使用 ResourceBundle 来存储信息,在运行时使用以下内容读取信息 while(ResourceBundle.getBundle("bu
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能是on-topi
我是一名优秀的程序员,十分优秀!