我有一个java应用程序,它处理avro消息的kafka流,并对每条消息执行mongodb集合上的查询。
在正确处理几十条消息之后,应用程序停止运行并抛出“com.mongodb.mongosocketreadexception:过早到达流的末尾”。
代码如下:
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
LocationType t = deserialize(encodedAvroData);
MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
options_builder.maxConnectionIdleTime(60000);
MongoClientOptions options = options_builder.build();
MongoClient mongo = new MongoClient ("localhost:27017", options);
MongoDatabase database = mongo.getDatabase("DB");
MongoCollection<Document> collection = database.getCollection("collection");
Document myDoc = collection.find(eq("key", 4)).first();
System.out.println(myDoc);
});
});
1条答案
按热度按时间6jjcrrmo1#
首先,您不应该为每个记录打开mongo连接!然后你应该关闭你的mongo连接。
mongo不喜欢打开很多(成百上千?)而不关闭它们。
下面是通过rdd打开mongo连接的示例: