메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AvroDataSubscribe implements Serializable  {
	private static final long serialVersionUID = -2895832218133628236L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
	
	private final int NUM_THREADS = 3;		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
		try {
			avroDataSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		Properties properties = new Properties();
		
		//class name을 user_id, grup_id로 사용함
		properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
		properties.put("group.id",group_id);
		properties.put("zookeeper.session.timeout.ms", "6000");
		properties.put("zookeeper.sync.time.ms", "2000");
		properties.put("auto.commit.enable", "true");
		properties.put("auto.commit.interval.ms", "5000");
		properties.put("fetch.message.max.bytes", "31457280");		// 30MB		
		properties.put("auto.offset.reset", "smallest");
		
		final ConsumerConnector consumer = 
				Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
		
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(TOPIC, NUM_THREADS);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 	consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
		ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
		
		for (int m = 0; m < NUM_THREADS; m++) {
			executor.execute(new ConsumerT(streams.get(m)));
		}
		
	}
	
	public class ConsumerT implements Runnable {
		private KafkaStream<byte[], byte[]> stream;
		private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
		
		public ConsumerT(KafkaStream<byte[], byte[]> stream) {
			super();
			this.stream = stream;
		}
		
		@Override
		public void run(){
			for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
				
				StringBuffer sb = new StringBuffer();
					
				byte[] message = (byte[]) messageAndMetadata.message();
				
				BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
				COL_ONEM2M read = null;
				String task_group_id = "";
				String task_id =  "";
				String start_time =  "";
				String colFrom =  "";
				String calcuate_latest_yn =  "";
				 
				try {
					 read = specificDatumReader.read(null, binaryDecoder);
					 
					 List<java.lang.CharSequence> data= read.getData();
					 
					 task_group_id = read.getTaskGroupId().toString();
					 task_id = read.getTaskId().toString();
						
				// 처리에 필요한 로직
				// .....
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
	}
}

번호 제목 날짜 조회 수
501 Impala Admission Control 설정시 쿼리가 사용하는 메모리 사용량 판단 방법 2023.05.19 672
500 초기 오류(java.lang.NoSuchMethodError)에 따른 후속 작업에서 오류(java.lang.NoClassDefFoundError)가 발생되는 상황(quartz에서 주기적으로 작업시) 2016.08.29 670
499 compile할때와 exclude할때 대상을 표현하는 명칭이 다르므로 주의할것 2016.08.10 669
498 동시에 많은 요청이 endpoint로 몰려서java.net.NoRouteToHostException가 발생하는 경우의 처리방법 2016.10.17 664
497 Job이 끝난 log을 볼수 있도록 설정하기 2016.05.30 664
496 [HDFS]Encryption Zone에 생성된 테이블 조회시 Failed to open HDFS file hdfs://nameservice1/tmp/zone1/sec_test_file.txt Error(255): Unknown error 255 Root cause: AuthorizationException: User:impala not allowd to do 'DECRYPT_EEK' on 'testkey' 2023.06.29 663
495 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 2016.11.08 663
494 [Atlas Server]org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions (user=atlas/node01.gooper.com@GOOPER.COM, scope=default:atlas_janus, params=[table=default:atlas_janus,], action-CREATE)] 2023.05.15 659
493 kudu의 내부 table명 변경하는 방법 2022.11.10 656
492 Error: java.lang.RuntimeException: java.lang.OutOfMemoryError 오류가 발생하는 경우 2018.09.20 654
491 db를 통째로 새로운 이름의 db로 복사하는 방법/절차 2017.11.14 652
490 Error: Could not find or load main class nodemnager 가 발생할때 해결하는 방법 2015.06.05 652
489 [Kudu]ERROR: Unable to advance iterator for node with id '2' for Kudu table 'impala::core.pm0_abdasubjct': Network error: recv error from unknown peer: Transport endpoint is not connected (error 107) 2023.03.16 647
488 [Hive canary]Hive에 Metastore canary red alert및 hive log파일에 Duplicate entry '123456' for key 'NOTIFICATION_LOG_EVENT_ID'가 발생시 조치사항 2023.03.10 647
487 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 2017.04.06 646
486 spark-submit 실행시 "java.lang.OutOfMemoryError: Java heap space"발생시 조치사항 2018.02.01 641
485 Scala에서 countByWindow를 이용하기(예제) 2018.03.08 631
484 spark-shell실행시 "A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection."오류가 발생하는 경우 해결방법 2016.05.20 627
483 Query 1234:1234 expired due to client inactivity(timeout is 5m)및 invalid query handle 2022.06.10 626
482 hive metadata(hive, impala, kudu 정보가 있음) 테이블에서 db, table, owner, location를 조회하는 쿼리 2020.02.07 622
위로