메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스이다. Runnable객채를 만들고 ExecutorService를 이용하여 topic에 지정된 partition개수 만큼의 쓰레드를 생성하여 쓰레드로 작업하도록 되어있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class AvroDataSubscribe implements Serializable  {
    private static final long serialVersionUID = -2895832218133628236L;
    private final String TOPIC = Utils.KafkaTopics.COL_ONEM2M.toString();
    private static final Log log = LogFactory.getLog(AvroDataSubscribe.class);
     
    private final int NUM_THREADS = 3;     
    private final String user_id =this.getClass().getName();
    private final String group_id = this.getClass().getSimpleName();
     
    public static void main(String[] args) {
        AvroDataSubscribe avroDataSubscribe = new AvroDataSubscribe();
        try {
            avroDataSubscribe.collect();
        } catch (Exception ex) {
            log.debug("exception in main() :"+ex.getStackTrace());
        }
    }
 
    public void collect() throws Exception{
        Properties properties = new Properties();
         
        //class name을 user_id, grup_id로 사용함
        properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
        properties.put("group.id",group_id);
        properties.put("zookeeper.session.timeout.ms", "6000");
        properties.put("zookeeper.sync.time.ms", "2000");
        properties.put("auto.commit.enable", "true");
        properties.put("auto.commit.interval.ms", "5000");
        properties.put("fetch.message.max.bytes", "31457280");      // 30MB    
        properties.put("auto.offset.reset", "smallest");
         
        final ConsumerConnector consumer =
                Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
         
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, NUM_THREADS);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
         
        for (int m = 0; m < NUM_THREADS; m++) {
            executor.execute(new ConsumerT(streams.get(m)));
        }
         
    }
     
    public class ConsumerT implements Runnable {
        private KafkaStream<byte[], byte[]> stream;
        private final SpecificDatumReader<COL_ONEM2M> specificDatumReader = new SpecificDatumReader<COL_ONEM2M>(COL_ONEM2M.class);
         
        public ConsumerT(KafkaStream<byte[], byte[]> stream) {
            super();
            this.stream = stream;
        }
         
        @Override
        public void run(){
            for(MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
                 
                StringBuffer sb = new StringBuffer();
                     
                byte[] message = (byte[]) messageAndMetadata.message();
                 
                BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
                COL_ONEM2M read = null;
                String task_group_id = "";
                String task_id =  "";
                String start_time =  "";
                String colFrom =  "";
                String calcuate_latest_yn =  "";
                  
                try {
                     read = specificDatumReader.read(null, binaryDecoder);
                      
                     List<java.lang.CharSequence> data= read.getData();
                      
                     task_group_id = read.getTaskGroupId().toString();
                     task_id = read.getTaskId().toString();
                         
                // 처리에 필요한 로직
                // .....
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
         
    }
}

번호 제목 날짜 조회 수
207 Not enough replica available for query at consistency QUORUM가 발생하는 경우 2017.06.21 1996
206 cassandra cluster 문제가 있는 node제거 하기(DN상태의 노드가 있으면 cassandra cluster 전체에 문제가 발생하므로 반드시 제거할것) 2017.06.21 1972
205 lagom을 이용한 샘플 경매 프로그램 실행방법 2017.06.20 2133
204 mysql-server 기동시 Do you already have another mysqld server running on port 오류 발생할때 확인및 조치방법 2017.05.14 3814
203 mapreduce appliction을 실행시 "is running beyond virtual memory limits" 오류 발생시 조치사항 2017.05.04 18279
202 Mysql DB 생성 및 권한. 특정아이피, 대역에 대한 접근 허용 2017.05.04 2839
201 Hive MetaStore Server기동시 Could not create "increment"/"table" value-generation container SEQUENCE_TABLE since autoCreate flags do not allow it. 오류발생시 조치사항 2017.05.03 1573
200 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 2017.05.03 1510
199 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적(?)으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 1181
198 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 1263
197 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 1308
196 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 1510
195 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 1513
194 Ubuntu 16.04 LTS에 MariaDB 10.1설치 및 포트변경 및 원격접속 허용 2017.05.01 2876
193 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 2017.05.01 2242
» Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 2194
191 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 1303
190 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 2011
189 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 6145
188 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 1947
위로