Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
kafka Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스
* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class AvroDataSubscribe implements Serializable { private static final long serialVersionUID = -2895832218133628236L; private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString(); private static final Log log = LogFactory.getLog(AvroDataSubscribe. class ); private final int NUM_THREADS = 3 ; private final String user_id = this .getClass().getName(); private final String group_id = this .getClass().getSimpleName(); public static void main(String[] args) { AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe(); try { avroDataSubscribe.collect(); } catch (Exception ex) { log.debug( "exception in main() :" +ex.getStackTrace()); } } public void collect() throws Exception{ Properties properties = new Properties(); //class name을 user_id, grup_id로 사용함 properties.put( "zookeeper.connect" ,Utils.ZOOKEEPER_LIST); properties.put( "group.id" ,group_id); properties.put( "zookeeper.session.timeout.ms" , "6000" ); properties.put( "zookeeper.sync.time.ms" , "2000" ); properties.put( "auto.commit.enable" , "true" ); properties.put( "auto.commit.interval.ms" , "5000" ); properties.put( "fetch.message.max.bytes" , "31457280" ); // 30MB properties.put( "auto.offset.reset" , "smallest" ); final ConsumerConnector consumer = Consumer.createJavaConsumerConnector( new ConsumerConfig(properties)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, NUM_THREADS); Map<String, List<KafkaStream< byte [], byte []>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream< byte [], byte []>> streams = consumerMap.get(TOPIC); ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); for ( int m = 0 ; m < NUM_THREADS; m++) { executor.execute( new ConsumerT(streams.get(m))); } } public class ConsumerT implements Runnable { private KafkaStream< byte [], byte []> stream; private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M. class ); public ConsumerT(KafkaStream< byte [], byte []> stream) { super (); this .stream = stream; } @Override public void run(){ for (MessageAndMetadata< byte [], byte []> messageAndMetadata : stream) { StringBuffer sb = new StringBuffer(); byte [] message = ( byte []) messageAndMetadata.message(); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null ); COL_ONEM2M read = null ; String task_group_id = "" ; String task_id = "" ; String start_time = "" ; String colFrom = "" ; String calcuate_latest_yn = "" ; try { read = specificDatumReader.read( null , binaryDecoder); List<java.lang.CharSequence> data= read.getData(); task_group_id = read.getTaskGroupId().toString(); task_id = read.getTaskId().toString(); // 처리에 필요한 로직 // ..... } catch (Exception e) { e.printStackTrace(); } } } } } |