Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
kafka Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스
* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.
import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class AvroDataSubscribe implements Serializable { private static final long serialVersionUID = -2895832218133628236L; private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString(); private static final Log log = LogFactory.getLog(AvroDataSubscribe.class); private final int NUM_THREADS = 3; private final String user_id =this.getClass().getName(); private final String group_id = this.getClass().getSimpleName(); public static void main(String[] args) { AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe(); try { avroDataSubscribe.collect(); } catch (Exception ex) { log.debug("exception in main() :"+ex.getStackTrace()); } } public void collect() throws Exception{ Properties properties = new Properties(); //class name을 user_id, grup_id로 사용함 properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST); properties.put("group.id",group_id); properties.put("zookeeper.session.timeout.ms", "6000"); properties.put("zookeeper.sync.time.ms", "2000"); properties.put("auto.commit.enable", "true"); properties.put("auto.commit.interval.ms", "5000"); properties.put("fetch.message.max.bytes", "31457280"); // 30MB properties.put("auto.offset.reset", "smallest"); final ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, NUM_THREADS); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); for (int m = 0; m < NUM_THREADS; m++) { executor.execute(new ConsumerT(streams.get(m))); } } public class ConsumerT implements Runnable { private KafkaStream<byte[], byte[]> stream; private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class); public ConsumerT(KafkaStream<byte[], byte[]> stream) { super(); this.stream = stream; } @Override public void run(){ for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) { StringBuffer sb = new StringBuffer(); byte[] message = (byte[]) messageAndMetadata.message(); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null); COL_ONEM2M read = null; String task_group_id = ""; String task_id = ""; String start_time = ""; String colFrom = ""; String calcuate_latest_yn = ""; try { read = specificDatumReader.read(null, binaryDecoder); List<java.lang.CharSequence> data= read.getData(); task_group_id = read.getTaskGroupId().toString(); task_id = read.getTaskId().toString(); // 처리에 필요한 로직 // ..... } catch (Exception e) { e.printStackTrace(); } } } } }