메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 날짜 조회 수
401 mongodb에서 큰데이타 sort시 오류발생에 대한 해결방법 2015.12.22 508
400 우분투 16.04LTS에 Jupyter설치 2018.04.17 505
399 fuseki에서 제공하는 script중 s-post를 사용하는 예문 2017.09.15 504
398 Java 8에서 pom.xml에 JavaDoc 관련 태그가 설정되어 있으나 오류등으로 실패하면 나머지 Maven작업이 종료되는 문제 해결 방법 2017.01.24 504
397 Scala버젼 변경 혹은 상황에 맞게 Spark소스 컴파일하기 2016.05.31 504
396 Scala를 이용한 Streaming예제 2018.03.08 502
395 java.lang.IllegalArgumentException: Does not contain a valid host:port authority: master 오류해결방법 2015.05.06 502
394 DeviceType이 o:motion-sensor_33 이거나 o:motion-sensor_32 경우의 sparql문장은 다음과 같다. 2017.08.16 499
393 update(update와 delete->insert)사용시 주의/참고사항 2016.01.06 499
392 fuseki에 update하는 방법(java api이용)및 주의 사항 2015.12.30 496
391 [개발] 온라인 IDE - 개발 환경 구축 없어 어디서나 웹브라우저로 개발하기 2022.05.02 495
390 [tomcat] logrotate를 이용하여 catalina.out로그파일 일별로 로테이션 저장하기 file 2017.01.18 494
389 System Properties Comparison Elasticsearch vs. Hive vs. Jena file 2016.03.10 494
388 fuseki가 제공하는 web ui를 통해서 dataset를 remove->create할 경우 동일한 동일한 이름으로 지정했을때 fuseki-server.jar가 뜨지 않는 현상 2017.02.03 493
387 source, sink를 직접 구현하여 사용하는 예시 2019.05.30 492
386 like검색한 결과를 기준으로 집계를 수행하는 java 소스 2016.12.19 492
385 S2RDF를 실행부분만 추출하여 1건의 triple data를 HDFS에 등록, sparql을 sql로 변환, sql실행하는 방법및 S2RDF소스 컴파일 방법 2016.06.15 492
384 [application수행 로그]Failed to read the application application_123456789012_123456시 조치 방법 2022.03.21 491
383 Hadoop의 Datanode를 Decommission하고 나서 HBase의 regionservers파일에 해당 노드명을 지웠는데 여전히 "Dead regionser"로 표시되는 경우 처리 2018.01.25 490
382 sparql 문법구조 설명 file 2015.12.09 490
위로