메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
	private static final long serialVersionUID = 1333478786266564011L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);
	
	private final TripleService tripleService = new TripleService();	
	private final int NUM_THREADS = 3;
		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
		try {
			avroDataSparkSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
				 .set("spark.ui.port", "4042")
				 .set("spark.blockManager.port", "38020")
				 .set("spark.broadcast.port", "38021")
				 .set("spark.driver.port", "38022")
				 .set("spark.executor.port", "38023")
				 .set("spark.fileserver.port", "38024")
				 .set("spark.replClassServer.port", "38025")
				 .set("spark.driver.memory", "4g")
				 .set("spark.executor.memory", "4g")
				 ;
		JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

		Map<String, String> conf = new HashMap<String, String>();
				//class name을 user_id, grup_id로 사용함
				conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
				conf.put("group.id",group_id);
				conf.put("zookeeper.session.timeout.ms", "6000");
				conf.put("zookeeper.sync.time.ms", "2000");
				conf.put("auto.commit.enable", "true");
				conf.put("auto.commit.interval.ms", "5000");
				conf.put("fetch.message.max.bytes", "31457280");		// 30MB		
				conf.put("auto.offset.reset", "smallest");
		
		jssc.checkpoint("/tmp");
		Map<String, Integer> topic = new HashMap<String, Integer>();
		topic.put(TOPIC, NUM_THREADS);

		try {
			JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
		    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
		    
			Function <byte[], String> wrkF =
					  new Function<byte[], String> (){
						private static final long serialVersionUID = 4509609657912968079L;

						public String call(byte[] x) {
							BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
							SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
							try {
								COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);								
								new ConsumerT(read).go();
							} catch (Exception e) {
								log.debug("xxx=>"+e.getMessage());
							}
							return "";							
						}
					  };
					  
			JavaDStream<String> rst = lines.map(wrkF);
			
			// action을 위해서...
			rst.print();
			
			jssc.start();
			jssc.awaitTermination();
		} catch (Exception e) {
			e.printStackTrace();
		  log.debug("exception : "+e.getMessage());
		}
	}
	
	public class ConsumerT implements Serializable {
		private static final long serialVersionUID = 7697840079748720000L;
		private COL_ONEM2M read;
		
		public ConsumerT(COL_ONEM2M read) {
			super();
			this.read = read;
		}
		
		public void go(){
			StringBuffer sb = new StringBuffer();
				
			String task_group_id = "";
			String task_id =  "";
			String start_time =  "";
			
			try {
				 List<java.lang.CharSequence> data= read.getData();
				 
				 task_group_id = read.getTaskGroupId().toString();
				 task_id = read.getTaskId().toString();
				 start_time = read.getStartTime().toString();

				 // 필요한 로직 ..... 
				 
			} catch (Exception e) {
				e.printStackTrace();
			} // try
		} // go method
	} // ConsumerT class
}

번호 제목 날짜 조회 수
450 java.util.NoSuchElementException발생시 조치 2014.08.27 4229
449 sentry설정 방법및 활성화시 설정이 필요한 파일및 설정값, 계정생성 방법 2018.08.16 4227
448 json 값 다루기 2014.04.17 4217
447 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 2016.09.02 4214
446 Apache Spark와 Drools를 이용한 CEP구현 테스트 2016.07.15 4212
445 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 4211
444 [TLS/SSL]Kudu Tablet Server설정 2022.05.13 4208
443 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 4207
442 Windows7 64bit 환경에서 Apache Hadoop 2.7.1설치하기 2017.07.26 4206
441 cloudera서비스 중지및 기동순서 2020.02.14 4203
440 하둡 클러스터 전체 노드를 다시 기동하면 invalidate metadata를 수행해야 데이터가 틀어지지 않는다. 2019.05.20 4200
439 conda를 이용한 jupyterhub(v0.9)및 jupyter설치 (v4.4.0) 2018.07.30 4199
438 [sentry]role부여후 테이블명이 변경되어 오류가 발생할때 조치방법 2018.10.16 4198
437 AIX 7.1에서 hive실행시 "hive: line 86: readlink: command not found" 오류가 발생시 임시 조치사항 2016.09.25 4195
436 [TLS/SSL]Cloudera 6.3.4기준 Oozie Web UI TLS설정 항목및 설정값 2022.05.13 4194
435 [Sentry] sentry메타 DB를 이용하여 테이블에 매핑되어 있는 role명칭 찾는 방법. 2022.06.22 4191
434 hadoop의 data디렉토리를 변경하는 방법 2014.08.24 4179
433 mongodb에서 큰데이타 sort시 오류발생에 대한 해결방법 2015.12.22 4176
432 vuestorefrontui.io를 이용한 front end project 생성하기 2022.02.06 4175
431 centos 5.X에 hadoop 2.0.5 alpha 설치 2013.12.16 4171
위로