메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
private static final long serialVersionUID = 1333478786266564011L;
private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);

private final TripleService tripleService = new TripleService();
private final int NUM_THREADS = 3;

private final String user_id =this.getClass().getName();
private final String group_id = this.getClass().getSimpleName();

public static void main(String[] args) {
AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
try {
avroDataSparkSubscribe.collect();
} catch (Exception ex) {
log.debug("exception in main() :"+ex.getStackTrace());
}
}

public void collect() throws Exception{
SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
 .set("spark.ui.port", "4042")
 .set("spark.blockManager.port", "38020")
 .set("spark.broadcast.port", "38021")
 .set("spark.driver.port", "38022")
 .set("spark.executor.port", "38023")
 .set("spark.fileserver.port", "38024")
 .set("spark.replClassServer.port", "38025")
 .set("spark.driver.memory", "4g")
 .set("spark.executor.memory", "4g")
 ;
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

Map<String, String> conf = new HashMap<String, String>();
//class name을 user_id, grup_id로 사용함
conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
conf.put("group.id",group_id);
conf.put("zookeeper.session.timeout.ms", "6000");
conf.put("zookeeper.sync.time.ms", "2000");
conf.put("auto.commit.enable", "true");
conf.put("auto.commit.interval.ms", "5000");
conf.put("fetch.message.max.bytes", "31457280"); // 30MB
conf.put("auto.offset.reset", "smallest");

jssc.checkpoint("/tmp");
Map<String, Integer> topic = new HashMap<String, Integer>();
topic.put(TOPIC, NUM_THREADS);

try {
JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
    
Function <byte[], String> wrkF =
  new Function<byte[], String> (){
private static final long serialVersionUID = 4509609657912968079L;

public String call(byte[] x) {
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
try {
COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);
new ConsumerT(read).go();
} catch (Exception e) {
log.debug("xxx=>"+e.getMessage());
}
return "";
}
  };
  
JavaDStream<String> rst = lines.map(wrkF);

// action을 위해서...
rst.print();

jssc.start();
jssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
  log.debug("exception : "+e.getMessage());
}
}

public class ConsumerT implements Serializable {
private static final long serialVersionUID = 7697840079748720000L;
private COL_ONEM2M read;

public ConsumerT(COL_ONEM2M read) {
super();
this.read = read;
}

public void go(){
StringBuffer sb = new StringBuffer();

String task_group_id = "";
String task_id =  "";
String start_time =  "";

try {
 List<java.lang.CharSequence> data= read.getData();
 
 task_group_id = read.getTaskGroupId().toString();
 task_id = read.getTaskId().toString();
 start_time = read.getStartTime().toString();

 // 필요한 로직 ..... 
 
} catch (Exception e) {
e.printStackTrace();
} // try
} // go method
} // ConsumerT class
}

번호 제목 날짜 조회 수
» Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 2905
190 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 4407
189 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 8170
188 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 5082
187 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 2017.04.06 4570
186 streaming작업시 입력된 값에 대한 사본을 만들게 되는데 이것이 실패했을때 발생하는 경고메세지 2017.04.03 4190
185 JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 2017.03.30 2676
184 kafka-manager 1.3.3.4 설정및 실행하기 2017.03.20 5075
183 spark 2.0.0의 api를 이용하는 예제 프로그램 2017.03.15 3629
182 It is indirectly referenced from required .class files 오류 발생시 조치방법 2017.03.09 4386
181 spark2.0.0에서 hive 2.0.1 table을 읽어 출력하는 예제 소스(HiveContext, SparkSession, SQLContext) 2017.03.09 3696
180 spark에서 hive table을 읽어 출력하는 예제 소스 2017.03.09 4932
179 spark에서 hive table을 읽어 출력하는 예제 소스 2017.03.09 3890
178 서버중 slave,worker,regionserver만 재기동해야 할때 필요한 기동스크립트및 사용방법 2017.02.03 4121
177 테이블의 row수를 빠르게 카운트 하는 방법 2017.01.26 2632
176 HDFS상의 /tmp폴더에 Permission denied오류가 발생시 조치사항 2017.01.25 2845
175 [JSON 파싱]mongodb의 document를 GSON을 이용하여 parsing할때 ObjectId값에서 오류 발생시 조치방법 2017.01.18 5559
174 spark 2.0.0를 windows에서 실행시 로컬 파일을 읽을때 발생하는 오류 해결 방법 2017.01.12 3477
173 new Gson().toJson(new ObjectId())을 사용하면 값이 다르게 나오는 경우가 있음 2016.12.23 4445
172 like검색한 결과를 기준으로 집계를 수행하는 java 소스 2016.12.19 4283
위로