gpt4 book ai didi

mongodb - MongoDB 和 Spark 中的连接过多

转载 作者:可可西里 更新时间:2023-11-01 09:55:08 25 4
gpt4 key购买 nike

我的 Spark Streaming 应用程序将数据存储在 MongoDB 中。

不幸的是,每个 Spark worker 在将其存储在 MongoDB 中时打开了太多连接

enter image description here

以下是我的代码 Spark - Mongo DB 代码:

public static void main(String[] args) {

int numThreads = Integer.parseInt(args[3]);
String mongodbOutputURL = args[4];
String masterURL = args[5];

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);

// Create a Spark configuration object to establish connection between the application and spark cluster
SparkConf sparkConf = new SparkConf().setAppName("AppName").setMaster(masterURL);

// Configure the Spark microbatch with interval time
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60*1000));

Configuration config = new Configuration();
config.set("mongo.output.uri", "mongodb://host:port/database.collection");

// Set the topics that should be consumed from Kafka cluster
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

// Establish the connection between kafka and Spark
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});

JavaPairDStream<Object, BSONObject> save = lines.mapToPair(new PairFunction<String, Object, BSONObject>() {
@Override
public Tuple2<Object, BSONObject> call(String input) {
BSONObject bson = new BasicBSONObject();
bson.put("field1", input.split(",")[0]);
bson.put("field2", input.split(",")[1]);
return new Tuple2<>(null, bson);
}
});
// Store the records in database
save.saveAsNewAPIHadoopFiles("prefix","suffix" ,Object.class, Object.class, MongoOutputFormat.class, config);

jssc.start();
jssc.awaitTermination();
}

如何控制每个worker的连接数?

我是否缺少任何配置参数?

更新 1:

我正在使用带有 Java API 的 Spark 1.3。

我无法执行 coalesce() 但我能够执行 repartition(2) 操作。

现在没有连接受到控制。

但我认为连接没有被关闭或没有在 worker 中重用。

请看下面的截图:

流媒体间隔 1 分钟和 2 个分区 enter image description here

最佳答案

您可以尝试映射分区,它在分区级别而不是记录级别工作,即,在一个节点上执行的任务将共享一个数据库连接而不是每条记录。

另外我猜你可以使用预分区(而不是流 RDD)。 Spark 足够聪明,可以利用它来减少随机播放。

关于mongodb - MongoDB 和 Spark 中的连接过多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34437561/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com