메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
	private static final long serialVersionUID = 1333478786266564011L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);
	
	private final TripleService tripleService = new TripleService();	
	private final int NUM_THREADS = 3;
		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
		try {
			avroDataSparkSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
				 .set("spark.ui.port", "4042")
				 .set("spark.blockManager.port", "38020")
				 .set("spark.broadcast.port", "38021")
				 .set("spark.driver.port", "38022")
				 .set("spark.executor.port", "38023")
				 .set("spark.fileserver.port", "38024")
				 .set("spark.replClassServer.port", "38025")
				 .set("spark.driver.memory", "4g")
				 .set("spark.executor.memory", "4g")
				 ;
		JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

		Map<String, String> conf = new HashMap<String, String>();
				//class name을 user_id, grup_id로 사용함
				conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
				conf.put("group.id",group_id);
				conf.put("zookeeper.session.timeout.ms", "6000");
				conf.put("zookeeper.sync.time.ms", "2000");
				conf.put("auto.commit.enable", "true");
				conf.put("auto.commit.interval.ms", "5000");
				conf.put("fetch.message.max.bytes", "31457280");		// 30MB		
				conf.put("auto.offset.reset", "smallest");
		
		jssc.checkpoint("/tmp");
		Map<String, Integer> topic = new HashMap<String, Integer>();
		topic.put(TOPIC, NUM_THREADS);

		try {
			JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
		    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
		    
			Function <byte[], String> wrkF =
					  new Function<byte[], String> (){
						private static final long serialVersionUID = 4509609657912968079L;

						public String call(byte[] x) {
							BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
							SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
							try {
								COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);								
								new ConsumerT(read).go();
							} catch (Exception e) {
								log.debug("xxx=>"+e.getMessage());
							}
							return "";							
						}
					  };
					  
			JavaDStream<String> rst = lines.map(wrkF);
			
			// action을 위해서...
			rst.print();
			
			jssc.start();
			jssc.awaitTermination();
		} catch (Exception e) {
			e.printStackTrace();
		  log.debug("exception : "+e.getMessage());
		}
	}
	
	public class ConsumerT implements Serializable {
		private static final long serialVersionUID = 7697840079748720000L;
		private COL_ONEM2M read;
		
		public ConsumerT(COL_ONEM2M read) {
			super();
			this.read = read;
		}
		
		public void go(){
			StringBuffer sb = new StringBuffer();
				
			String task_group_id = "";
			String task_id =  "";
			String start_time =  "";
			
			try {
				 List<java.lang.CharSequence> data= read.getData();
				 
				 task_group_id = read.getTaskGroupId().toString();
				 task_id = read.getTaskId().toString();
				 start_time = read.getStartTime().toString();

				 // 필요한 로직 ..... 
				 
			} catch (Exception e) {
				e.printStackTrace();
			} // try
		} // go method
	} // ConsumerT class
}

번호 제목 날짜 조회 수
401 mongodb에서 큰데이타 sort시 오류발생에 대한 해결방법 2015.12.22 508
400 우분투 16.04LTS에 Jupyter설치 2018.04.17 505
399 fuseki에서 제공하는 script중 s-post를 사용하는 예문 2017.09.15 504
398 Java 8에서 pom.xml에 JavaDoc 관련 태그가 설정되어 있으나 오류등으로 실패하면 나머지 Maven작업이 종료되는 문제 해결 방법 2017.01.24 504
397 Scala버젼 변경 혹은 상황에 맞게 Spark소스 컴파일하기 2016.05.31 504
396 Scala를 이용한 Streaming예제 2018.03.08 502
395 java.lang.IllegalArgumentException: Does not contain a valid host:port authority: master 오류해결방법 2015.05.06 502
394 DeviceType이 o:motion-sensor_33 이거나 o:motion-sensor_32 경우의 sparql문장은 다음과 같다. 2017.08.16 499
393 update(update와 delete->insert)사용시 주의/참고사항 2016.01.06 499
392 fuseki에 update하는 방법(java api이용)및 주의 사항 2015.12.30 496
391 [개발] 온라인 IDE - 개발 환경 구축 없어 어디서나 웹브라우저로 개발하기 2022.05.02 495
390 [tomcat] logrotate를 이용하여 catalina.out로그파일 일별로 로테이션 저장하기 file 2017.01.18 494
389 System Properties Comparison Elasticsearch vs. Hive vs. Jena file 2016.03.10 494
388 fuseki가 제공하는 web ui를 통해서 dataset를 remove->create할 경우 동일한 동일한 이름으로 지정했을때 fuseki-server.jar가 뜨지 않는 현상 2017.02.03 493
387 [application수행 로그]Failed to read the application application_123456789012_123456시 조치 방법 2022.03.21 492
386 source, sink를 직접 구현하여 사용하는 예시 2019.05.30 492
385 like검색한 결과를 기준으로 집계를 수행하는 java 소스 2016.12.19 492
384 S2RDF를 실행부분만 추출하여 1건의 triple data를 HDFS에 등록, sparql을 sql로 변환, sql실행하는 방법및 S2RDF소스 컴파일 방법 2016.06.15 492
383 [Cloudera 6.3.4, Kudu]]Service Monitor에서 사용하는 metric중에 일부를 blacklist로 설정하여 모니터링 정보 수집 제외하는 방법 2022.07.08 490
382 Hadoop의 Datanode를 Decommission하고 나서 HBase의 regionservers파일에 해당 노드명을 지웠는데 여전히 "Dead regionser"로 표시되는 경우 처리 2018.01.25 490
위로