Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
$match에서 집계 대상을 선정하고 $group에서 그룹핑하는 컬럼을 지정한다.
최종적으로 결과에 포함시킬 컬럼은 $project에서 지정한다(값이 1이면 최종 결과에 포함되고 0이면 포함하지 않음)
// MongoDB연결 try { mongoClient = new MongoClient(new ServerAddress(db_server, Integer.parseInt(db_port))); db = mongoClient.getDB(db_name); table = db.getCollection(collection_name); } catch (Exception ex) { log.debug("MongoDB connection error : "+ex.getMessage()); if(db != null) { db.cleanCursors(true); db = null; } if(table != null) {table = null;} if(mongoClient != null ) { mongoClient.close(); } throw ex; } // 리턴 값 List<Map<String, String>> list = new ArrayList<Map<String, String>>(); // 집계 수행(ty=4이고 _uri가 "TicketCount/status/CONTENT_INST"포함하며 ct가 지금부터 5분전에 해당되는 data에 대해서..) DBObject match = new BasicDBObject(); //"$match", new BasicDBObject("ct", new BasicDBObject("$gte", "20161213T160000"))); match.put("ty",4); match.put("_uri", new BasicDBObject("$regex", "TicketCount/status/CONTENT_INST")); //match.put("ct", new BasicDBObject("$gte", "20161213T160000")); long nowDate = new Date().getTime(); long newDate = nowDate-(5*60*1000); match.put("ct", new BasicDBObject("$gte", Utils.dateFormat.format((new Date(newDate))))); //Forming Group parts(cr컬럼을 기준으로 grouping하고 con값을 sum하여 sum_con컬럼으로 담는다) DBObject group = new BasicDBObject(); group.put("_id", "$cr"); group.put("sum_con", new BasicDBObject("$sum", "$con")); //group.put("sum_con", new BasicDBObject("$sum", 1)); //Forming Project parts(최종적으로 _id값을 cr컬럼으로 뽑아내고, _id는 뽑아내지 않으며 sum_con은 결과로서 뽑아낸다) DBObject project = new BasicDBObject(); project.put("cr","$_id"); project.put("_id",0); project.put("sum_con", 1); try { AggregationOutput output = db.getCollection("resource").aggregate( new BasicDBObject("$match", match), new BasicDBObject("$group", group), new BasicDBObject("$project", project) ); //System.out.println("output : "+output.getCommandResult().getString("result")); Iterator<DBObject> itr = output.results().iterator(); while(itr.hasNext()) { DBObject dbObject =itr.next(); //JSONObject jsonObject = JSONObject.fromObject(dbObject.toString()); //Map<String, String> newMap = castMap(dbObject.toMap(), String.class, String.class); @SuppressWarnings("unchecked") Map<String, String> newMap = makeStringMap(dbObject.toMap()); list.add(newMap); } return list; } catch (Exception e) { log.debug("Exception : "+e.getMessage()); throw e; } finally { if(db != null) { db.cleanCursors(true); table = null; db = null; } if(mongoClient != null ) { mongoClient.close(); } }