mongodb&spark:“com.mongodb.mongosocketreadexception:过早到达流的末尾”

fslejnso  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(255)

我有一个java应用程序,它处理avro消息的kafka流,并对每条消息执行mongodb集合上的查询。
在正确处理几十条消息之后,应用程序停止运行并抛出“com.mongodb.mongosocketreadexception:过早到达流的末尾”。
代码如下:

JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
            String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

    directKafkaStream.foreachRDD(rdd ->{

        rdd.foreach(avroRecord -> {

            byte[] encodedAvroData = avroRecord._2;
            LocationType t = deserialize(encodedAvroData);

            MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
            options_builder.maxConnectionIdleTime(60000);
            MongoClientOptions options = options_builder.build();
            MongoClient mongo = new MongoClient ("localhost:27017", options);

            MongoDatabase database = mongo.getDatabase("DB");
            MongoCollection<Document> collection = database.getCollection("collection");

            Document myDoc = collection.find(eq("key", 4)).first();
            System.out.println(myDoc);

        });
    });
6jjcrrmo

6jjcrrmo1#

首先,您不应该为每个记录打开mongo连接!然后你应该关闭你的mongo连接。
mongo不喜欢打开很多(成百上千?)而不关闭它们。
下面是通过rdd打开mongo连接的示例:

directKafkaStream.foreachRDD(rdd ->{
    rdd.foreachPartition(it -> {

        // Opens only 1 connection per partition
        MongoClient mongo = new MongoClient ("localhost:27017");
        MongoDatabase database = mongo.getDatabase("DB");
        MongoCollection<Document> collection = database.getCollection("collection");

        while (it.hasNext()) {
            byte[] encodedAvroData = it.next()._2;
            LocationType t = deserialize(encodedAvroData);

            Document myDoc = collection.find(eq("key", 4)).first();
            System.out.println(myDoc);
        }

        mongo.close();
    });
});

相关问题