메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.

topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.


*샘플프로그램

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

public class AvroDataSparkSubscribe implements Serializable {
private static final long serialVersionUID = 1333478786266564011L;
private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class);

private final TripleService tripleService = new TripleService();
private final int NUM_THREADS = 3;

private final String user_id =this.getClass().getName();
private final String group_id = this.getClass().getSimpleName();

public static void main(String[] args) {
AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe();
try {
avroDataSparkSubscribe.collect();
} catch (Exception ex) {
log.debug("exception in main() :"+ex.getStackTrace());
}
}

public void collect() throws Exception{
SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe")
 .set("spark.ui.port", "4042")
 .set("spark.blockManager.port", "38020")
 .set("spark.broadcast.port", "38021")
 .set("spark.driver.port", "38022")
 .set("spark.executor.port", "38023")
 .set("spark.fileserver.port", "38024")
 .set("spark.replClassServer.port", "38025")
 .set("spark.driver.memory", "4g")
 .set("spark.executor.memory", "4g")
 ;
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

Map<String, String> conf = new HashMap<String, String>();
//class name을 user_id, grup_id로 사용함
conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
conf.put("group.id",group_id);
conf.put("zookeeper.session.timeout.ms", "6000");
conf.put("zookeeper.sync.time.ms", "2000");
conf.put("auto.commit.enable", "true");
conf.put("auto.commit.interval.ms", "5000");
conf.put("fetch.message.max.bytes", "31457280"); // 30MB
conf.put("auto.offset.reset", "smallest");

jssc.checkpoint("/tmp");
Map<String, Integer> topic = new HashMap<String, Integer>();
topic.put(TOPIC, NUM_THREADS);

try {
JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY());
    JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2());
    
Function <byte[], String> wrkF =
  new Function<byte[], String> (){
private static final long serialVersionUID = 4509609657912968079L;

public String call(byte[] x) {
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null);
SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
try {
COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder);
new ConsumerT(read).go();
} catch (Exception e) {
log.debug("xxx=>"+e.getMessage());
}
return "";
}
  };
  
JavaDStream<String> rst = lines.map(wrkF);

// action을 위해서...
rst.print();

jssc.start();
jssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
  log.debug("exception : "+e.getMessage());
}
}

public class ConsumerT implements Serializable {
private static final long serialVersionUID = 7697840079748720000L;
private COL_ONEM2M read;

public ConsumerT(COL_ONEM2M read) {
super();
this.read = read;
}

public void go(){
StringBuffer sb = new StringBuffer();

String task_group_id = "";
String task_id =  "";
String start_time =  "";

try {
 List<java.lang.CharSequence> data= read.getData();
 
 task_group_id = read.getTaskGroupId().toString();
 task_id = read.getTaskId().toString();
 start_time = read.getStartTime().toString();

 // 필요한 로직 ..... 
 
} catch (Exception e) {
e.printStackTrace();
} // try
} // go method
} // ConsumerT class
}

번호 제목 날짜 조회 수
212 갑자기 DataNode가 java.io.IOException: Premature EOF from inputStream를 반복적으로 발생시키다가 java.lang.OutOfMemoryError: Java heap space를 내면서 죽는 경우 조치방법 2017.07.19 7001
211 Current heap configuration for MemStore and BlockCache exceeds the threshold required for successful cluster operation 2017.07.18 4460
210 HBase 설정 최적화하기(VCNC) file 2017.07.18 3940
209 HBase write 성능 튜닝 file 2017.07.18 3811
208 mysql에서 외부 디비를 커넥션할 경우 접속 속도가 느려질때 2017.06.30 6358
207 Not enough replica available for query at consistency QUORUM가 발생하는 경우 2017.06.21 5067
206 cassandra cluster 문제가 있는 node제거 하기(DN상태의 노드가 있으면 cassandra cluster 전체에 문제가 발생하므로 반드시 제거할것) 2017.06.21 4567
205 lagom을 이용한 샘플 경매 프로그램 실행방법 2017.06.20 5035
204 mysql-server 기동시 Do you already have another mysqld server running on port 오류 발생할때 확인및 조치방법 2017.05.14 6262
203 mapreduce appliction을 실행시 "is running beyond virtual memory limits" 오류 발생시 조치사항 2017.05.04 22526
202 Mysql DB 생성 및 권한. 특정아이피, 대역에 대한 접근 허용 2017.05.04 6378
201 Hive MetaStore Server기동시 Could not create "increment"/"table" value-generation container SEQUENCE_TABLE since autoCreate flags do not allow it. 오류발생시 조치사항 2017.05.03 3992
200 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 2017.05.03 4017
199 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적(?)으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 5235
198 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 5328
197 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 4831
196 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 4148
195 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 5276
194 Ubuntu 16.04 LTS에 MariaDB 10.1설치 및 포트변경 및 원격접속 허용 2017.05.01 5707
193 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 2017.05.01 5176
위로