Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
*Spark에서 KafkaUtils를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 Java 소스이다.
topic의 partition이 3개로 만들어져 있는데 별도의 thread를 만들어서 처리하지 않고 KafkaUtils.createStream()을 사용시 스레드 개수를 지정하여 주면 지정한 개수 만큼의 스레드를 내부적으로 생성하여 broker의 topic에 접근한다.
*샘플프로그램
import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; public class AvroDataSparkSubscribe implements Serializable { private static final long serialVersionUID = 1333478786266564011L; private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString(); private static final Log log = LogFactory.getLog(AvroDataSparkSubscribe.class); private final TripleService tripleService = new TripleService(); private final int NUM_THREADS = 3; private final String user_id =this.getClass().getName(); private final String group_id = this.getClass().getSimpleName(); public static void main(String[] args) { AvroDataSparkSubscribe avroDataSparkSubscribe = new AvroDataSparkSubscribe(); try { avroDataSparkSubscribe.collect(); } catch (Exception ex) { log.debug("exception in main() :"+ex.getStackTrace()); } } public void collect() throws Exception{ SparkConf sc=new SparkConf().setAppName("AvroDataSparkSubscribe") .set("spark.ui.port", "4042") .set("spark.blockManager.port", "38020") .set("spark.broadcast.port", "38021") .set("spark.driver.port", "38022") .set("spark.executor.port", "38023") .set("spark.fileserver.port", "38024") .set("spark.replClassServer.port", "38025") .set("spark.driver.memory", "4g") .set("spark.executor.memory", "4g") ; JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10)); Map<String, String> conf = new HashMap<String, String>(); //class name을 user_id, grup_id로 사용함 conf.put("zookeeper.connect",Utils.ZOOKEEPER_LIST); conf.put("group.id",group_id); conf.put("zookeeper.session.timeout.ms", "6000"); conf.put("zookeeper.sync.time.ms", "2000"); conf.put("auto.commit.enable", "true"); conf.put("auto.commit.interval.ms", "5000"); conf.put("fetch.message.max.bytes", "31457280"); // 30MB conf.put("auto.offset.reset", "smallest"); jssc.checkpoint("/tmp"); Map<String, Integer> topic = new HashMap<String, Integer>(); topic.put(TOPIC, NUM_THREADS); try { JavaPairReceiverInputDStream<byte[], byte[]> kafkaStream = KafkaUtils.createStream(jssc,byte[].class, byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, conf, topic, StorageLevel.MEMORY_ONLY()); JavaDStream<byte[]> lines = kafkaStream.map(tuple2 -> tuple2._2()); Function <byte[], String> wrkF = new Function<byte[], String> (){ private static final long serialVersionUID = 4509609657912968079L; public String call(byte[] x) { BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(x, null); SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class); try { COL_ONEM2M read = specificDatumReader.read(null, binaryDecoder); new ConsumerT(read).go(); } catch (Exception e) { log.debug("xxx=>"+e.getMessage()); } return ""; } }; JavaDStream<String> rst = lines.map(wrkF); // action을 위해서... rst.print(); jssc.start(); jssc.awaitTermination(); } catch (Exception e) { e.printStackTrace(); log.debug("exception : "+e.getMessage()); } } public class ConsumerT implements Serializable { private static final long serialVersionUID = 7697840079748720000L; private COL_ONEM2M read; public ConsumerT(COL_ONEM2M read) { super(); this.read = read; } public void go(){ StringBuffer sb = new StringBuffer(); String task_group_id = ""; String task_id = ""; String start_time = ""; try { List<java.lang.CharSequence> data= read.getData(); task_group_id = read.getTaskGroupId().toString(); task_id = read.getTaskId().toString(); start_time = read.getStartTime().toString(); // 필요한 로직 ..... } catch (Exception e) { e.printStackTrace(); } // try } // go method } // ConsumerT class }