我正在尝试使用mongohadoop对mongo集合执行一些聚合操作(https://github.com/mongodb/mongo-hadoop)Spark图书馆。我使用mongo.input.query配置输入查询,该配置作为输入发送到newapihadooprdd。
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat";
mongodbConfig.set("mongo.input.uri","mongodb://"+mongodbHost+"/"+database.collection);
mongodbConfig.set("mongo.input.query",query);
JavaPairRDD audienceRDD = sc.newAPIHadoopRDD(mongodbConfig, MongoInputFormat.class,Object.class, BSONObject.class);
audienceRDD.foreach(e -> System.out.println("data: "+e.toString()));
query={ "aggregate" : "__collection__" , "pipeline" : [
{ "$match" : { "date" : { "$gte" : { "$date" : "2016-08-09T00:00:00.000Z"} , "$lte" : { "$date" : "2016-08-11T00:00:00.000Z"}}}} ,
{ "$unwind" : "$segments"} ,
{ "$group" : { "_id" : "$segments" , "audienceSize" : { "$sum" : "$count"}}}]}, sort={ }, fields={ }, limit=0, notimeout=false}
如果我使用像find这样的普通查询,那么操作是成功的。但是当我尝试使用groupby时,我在rdd上没有任何记录。有人能建议一种使用mongohadoop连接器对mongo集合执行聚合操作的方法吗。
1条答案
按热度按时间niwlg2el1#
无论如何,由于运行聚合查询的16MB限制,我最终用这些记录创建了一个临时集合,然后对该临时集合进行了查询。将响应存储在rdd上,一旦我完成了我想做的事情,我就放弃了临时收集。
也就是说,我认为添加使用mongo.input.query进行聚合查询的功能将是对这个漂亮的连接器库的一个很好的补充。