메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
	private static final long serialVersionUID = 1333478786266564011L;
	private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
	private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);
	
	private final TripleService tripleService = new TripleService();	
	private final int NUM_THREADS = 3;
		
	private final String user_id =this.getClass().getName();
	private final String group_id = this.getClass().getSimpleName();
	
	public static void main(String[] args) {
		AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
		try {
			avroDataSparkSubscribe.collect();
		} catch (Exception ex) {
			log.debug("exception in main() :"+ex.getStackTrace());
		}
	}

	public void collect() throws Exception{
		SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
				 .set("spark.ui.port", "4042")
				 .set("spark.blockManager.port", "38020")
				 .set("spark.broadcast.port", "38021")
				 .set("spark.driver.port", "38022")
				 .set("spark.executor.port", "38023")
				 .set("spark.fileserver.port", "38024")
				 .set("spark.replClassServer.port", "38025")
				 .set("spark.driver.memory", "4g")
				 .set("spark.executor.memory", "4g")
				 ;
		JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

		Map<String, String> conf = new HashMap<String, String>();
				//class name을 user_id, grup_id로 사용함
				conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
				conf.put("group.id",group_id);
				conf.put("zookeeper.session.timeout.ms", "6000");
				conf.put("zookeeper.sync.time.ms", "2000");
				conf.put("auto.commit.enable", "true");
				conf.put("auto.commit.interval.ms", "5000");
				conf.put("fetch.message.max.bytes", "31457280");		// 30MB		
				conf.put("auto.offset.reset", "smallest");
		
		jssc.checkpoint("/tmp");
		Map<String, Integer> topic = new HashMap<String, Integer>();
		topic.put(TOPIC, NUM_THREADS);

		try {
			JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
		    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
		    
			Function <byte[], String> wrkF =
					  new Function<byte[], String> (){
						private static final long serialVersionUID = 4509609657912968079L;

						public String call(byte[] x) {
							BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
							SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
							try {
								COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);								
								new ConsumerT(read).go();
							} catch (Exception e) {
								log.debug("xxx=>"+e.getMessage());
							}
							return "";							
						}
					  };
					  
			JavaDStream<String> rst = lines.map(wrkF);
			
			// action을 위해서...
			rst.print();
			
			jssc.start();
			jssc.awaitTermination();
		} catch (Exception e) {
			e.printStackTrace();
		  log.debug("exception : "+e.getMessage());
		}
	}
	
	public class ConsumerT implements Serializable {
		private static final long serialVersionUID = 7697840079748720000L;
		private COL_ONEM2M read;
		
		public ConsumerT(COL_ONEM2M read) {
			super();
			this.read = read;
		}
		
		public void go(){
			StringBuffer sb = new StringBuffer();
				
			String task_group_id = "";
			String task_id =  "";
			String start_time =  "";
			
			try {
				 List<java.lang.CharSequence> data= read.getData();
				 
				 task_group_id = read.getTaskGroupId().toString();
				 task_id = read.getTaskId().toString();
				 start_time = read.getStartTime().toString();

				 // 필요한 로직 ..... 
				 
			} catch (Exception e) {
				e.printStackTrace();
			} // try
		} // go method
	} // ConsumerT class
}

번호 제목 날짜 조회 수
281 HDFS에서 quota 설정 방법및 확인 방법 2022.03.30 399
280 권한회수 및 권한부여 명령 몇가지 2017.11.16 397
279 sendmail + dovecot(pop3) + saslauthd 설치 2017.06.11 396
278 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 396
277 우분투 16.04 설치후 APM (Apache2, PHP, MySQL) 설치 2017.01.29 394
276 drools에서 drl관련 로그를 기록하기 위한 클래스 파일 2016.07.21 394
275 lagom의 online-auction-java프로젝트 실행시 외부의 kafka/cassandra를 사용하도록 설정하는 방법 2017.10.12 393
274 fuseki webUI를 통해서 전체 카운트를 하면 급격하게 메모리를 소모해 버리는 문제가 있음 file 2017.04.28 393
» Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 393
272 Cluster Install -> Provide Login Credentials에서 root가 아닌 다른 사용자를 지정하는 경우 "Exhausted available authentication methods"오류 발생시 조치방법 2018.05.22 392
271 editLog의 문제로 발생하는 journalnode 기동 오류 발생시 조치사항 2017.09.14 390
270 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 2016.09.02 388
269 [CentOS] 네트워크 설정 2018.03.26 387
268 spark-submit으로 spark application실행하는 다양한 방법 2016.05.25 387
267 schema.xml vs managed-schema 지정 사용하기 - 두개를 동시에 사용할 수는 없음 2017.07.09 385
266 이미지 관리 오픈소스 목록 2018.03.11 382
265 ResultSet에서 데이타를 List<Map<String,String>>형태로 만들어서 리턴하는 소스(Collections.sort를 이용한 정렬 가능) 2016.12.15 381
264 [sbt] sbt 0.13.11 를 windows에 설치하고 scala프로그램을 compile해서 jar파일 만들기 2016.07.11 380
263 쿠버네티스(k8s) 설치 및 클러스터 구성하기 2019.10.19 379
262 sparql 1.1 BIND(if() as ?bind변수) 버그로 추정되는 문제점및 해결방안 -> select 문에 (if(,,) as ?bind변수) file 2016.01.21 379
위로