메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함

1. avro schema파일 준비(emp.avsc)

{

   "namespace": "w3ii.com",

   "type": "record",

   "name": "emp",

   "fields": [

      {"name": "name", "type": "string"},

      {"name": "id", "type": "int"},

      {"name": "salary", "type": "int"},

      {"name": "age", "type": "int"},

      {"name": "address", "type": "string"}

   ]

}


* 아래의 클래스 구조를 avro schema로 정의하는 샘플

class Child {

    String name;

}


class Parent {

    list<Child> children;

}


아래와 같이 정의해야 한다.------->

{

"name": "Parent",

"type":"record",

"fields":[

    {

        "name":"children",

        "type":{

            "type": "array",  

            "items":{

                        "name":"Child",

                        "type":"record",

                        "fields":[

                            {"name":"name", "type":"string"}

                        ]

                    }

            }

    }

}



2. avsc파일 컴파일

C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .

Input files to compile:

  emp.avsc


* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java


3. serializing/ deserializing

  가. serializing

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}


  나. deserializing

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();

topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);



4. producer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.io.IOException;

import java.util.Properties;


import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;


import org.apache.avro.io.BinaryEncoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.specific.SpecificDatumWriter;

import org.apache.commons.io.IOUtils;

import org.apache.commons.io.output.ByteArrayOutputStream;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpEmitter {

public Producer<String, byte[]> producer;

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

/**

* broker주소

*/

private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";


public AvroEmpEmitter(String broker) {

Properties props = new Properties();

props.put("metadata.broker.list", broker);

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

props.put("request.required.acks", "1");

producer = new Producer<String, byte[]>(new ProducerConfig(props));

}

public void send(Emp event) {

EncoderFactory avroEncoderFactory = EncoderFactory.get();

SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);

ByteArrayOutputStream stream = new ByteArrayOutputStream();

BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);

try {

avroEventWriter.write(event, binaryEncoder);

binaryEncoder.flush();

} catch (IOException e) {

e.printStackTrace();

}

IOUtils.closeQuietly(stream);


KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(

TOPIC, stream.toByteArray());


producer.send(data);

}

public static void main(String[] args) {

AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);

// Emp전송

System.out.println("Send start(Emp)......................");

avroOneM2MEmitter.send(buildEmp());

System.out.println("Send end(Emp)......................");


// 전송끝 

avroOneM2MEmitter.close();


}

private void close() {

producer.close();

}


private static Emp buildEmp() {

Emp emp = new Emp();

emp.setId(1000);

emp.setName("이벤트명칭");

emp.setSalary(20);

emp.setAge(1);

emp.setAddress("여기는 주소입니다.");

return emp;

}

}


5. consumer 샘플소스

package com.gooper.icbms.sda.test.kafka;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;


import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.specific.SpecificDatumReader;


import com.gooper.icbms.sda.comm.util.Utils;

import com.gooper.icbms.sda.test.kafka.avro.Emp;


public class AvroEmpSubscribe {

private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();

private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";

@SuppressWarnings({ "rawtypes", "unchecked" })

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);

properties.put("group.id","testgroup_11");

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

   //properties.put("auto.commit.enable", "false"); 

properties.put("auto.commit.enable", "true"); 

                // 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음

// default값이 60000임

properties.put("auto.commit.interval.ms", "60000"); 

properties.put("fetch.message.max.bytes", "31457280"); // 30MB  

properties.put("auto.offset.reset", "smallest");

//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리

ConsumerConnector consumer = 

Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

Map<String, Integer> topicCount = new HashMap<String, Integer>();


topicCount.put(TOPIC, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);

SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);

for (final KafkaStream stream : streams) {

ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();

while (consumerIte.hasNext()) {

try {

MessageAndMetadata msg = consumerIte.next();

byte[] message = (byte[]) msg.message();

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);

Emp read = specificDatumReader.read(null, binaryDecoder);

System.out.println("Message from Topic("+TOPIC+") : " + read.toString());

System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}

if (consumer != null) consumer.shutdown();

}

}



* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema

번호 제목 날짜 조회 수
242 커리 변경 이벤트를 처리하기 위한 구현클래스 2016.07.21 865
241 룰에 매칭되면 발생되는 엑티베이션 객체에 대한 작업(이전값 혹은 현재값)을 처리하는 클래스 파일 2016.07.21 445
240 실시간 쿼리 변환 모니터링(팩트내 필드값의 변경사항을 실시간으로 추적함)하는 테스트 java 프로그램 file 2016.07.21 199
239 Drools 6.0 - 비즈니스 룰 기반으로 간단한 룰 애플리케이션 만들기 file 2016.07.18 1037
238 Apache Spark와 Drools를 이용한 CEP구현 테스트 2016.07.15 1111
237 org.apache.hadoop.hbase.ClockOutOfSyncException: org.apache.hadoop.hbase.ClockOutOfSyncException 오류시 조치사항 2016.07.14 241
236 kafka로 부터 메세지를 stream으로 받아 처리하는 spark샘플소스(spark의 producer와 consumer를 sbt로 컴파일 하고 서버에서 spark-submit하는 방법) 2016.07.13 788
235 [sbt] sbt-assembly를 이용하여 실행에 필요한 모든 j라이브러리를 포함한 fat jar파일 만들기 2016.07.11 2142
234 [sbt] sbt 0.13.11 를 windows에 설치하고 scala프로그램을 compile해서 jar파일 만들기 2016.07.11 678
» avro 사용하기(avsc 스키마 파일 컴파일 방법, consumer, producer샘플소스) 2016.07.08 1573
232 DataSetCreator실행시 "Illegal character in fragment at index"오류가 나는 경우 조치방안 2016.06.17 603
231 5건의 triple data를 이용하여 특정 작업 폴더에서 작업하는 방법/절차 2016.06.16 156
230 queryTranslator실행시 NullPointerException가 발생전에 java.lang.ArrayIndexOutOfBoundsException발생시 조치사항 2016.06.16 937
229 S2RDF를 실행부분만 추출하여 1건의 triple data를 HDFS에 등록, sparql을 sql로 변환, sql실행하는 방법및 S2RDF소스 컴파일 방법 2016.06.15 551
228 S2RDF모듈의 실행부분만 추출하여 별도록 실행하는 방법(draft) 2016.06.14 292
227 spark-sql실행시 ERROR log: Got exception: java.lang.NumberFormatException For input string: "2000ms" 오류발생시 조치사항 2016.06.09 390
226 spark-sql실행시 Caused by: java.lang.NumberFormatException: For input string: "0s" 오류발생시 조치사항 2016.06.09 4759
225 spark-sql실행시 The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH오류 발생시 조치사항 2016.06.09 684
224 ./spark-sql 실행시 "java.lang.NumberFormatException: For input string: "1s"오류발생시 조치사항 2016.06.09 529
223 beeline실행시 User: root is not allowed to impersonate오류 발생시 조치사항 2016.06.03 922
위로