gpt4 book ai didi

java - mongoDB 和 Spark : "com.mongodb.MongoSocketReadException: Prematurely reached end of stream"

转载 作者:可可西里 更新时间:2023-11-01 10:48:19 24 4
gpt4 key购买 nike

我有一个 Java 应用程序,它处理 avro 消息的 Kafka 流,并针对每条消息对 mongoDB 集合执行查询。

在正确处理几十条消息后,应用程序停止运行并抛出“com.mongodb.MongoSocketReadException:过早到达流末尾”。

代码如下:

    JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

directKafkaStream.foreachRDD(rdd ->{

rdd.foreach(avroRecord -> {

byte[] encodedAvroData = avroRecord._2;
LocationType t = deserialize(encodedAvroData);

MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
options_builder.maxConnectionIdleTime(60000);
MongoClientOptions options = options_builder.build();
MongoClient mongo = new MongoClient ("localhost:27017", options);

MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");

Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);

});
});

最佳答案

首先,您不应该为每条 记录打开一个mongo 连接!然后你应该关闭你的 mongo 连接。

Mongo 不喜欢打开很多(成百上千?)而不关闭它们。

这是一个通过 RDD 打开 mongo 连接的例子:

directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(it -> {

// Opens only 1 connection per partition
MongoClient mongo = new MongoClient ("localhost:27017");
MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");

while (it.hasNext()) {
byte[] encodedAvroData = it.next()._2;
LocationType t = deserialize(encodedAvroData);

Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);
}

mongo.close();
});
});

关于java - mongoDB 和 Spark : "com.mongodb.MongoSocketReadException: Prematurely reached end of stream",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47437665/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com