Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
0. topic은 동일한 데이타 구조를 가지는 개별로 topic을 가지고 있어야함
1. avro schema파일 준비(emp.avsc)
{
"namespace": "w3ii.com",
"type": "record",
"name": "emp",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int"},
{"name": "salary", "type": "int"},
{"name": "age", "type": "int"},
{"name": "address", "type": "string"}
]
}
* 아래의 클래스 구조를 avro schema로 정의하는 샘플
class Child {
String name;
}
class Parent {
list<Child> children;
}
아래와 같이 정의해야 한다.------->
{
"name": "Parent",
"type":"record",
"fields":[
{
"name":"children",
"type":{
"type": "array",
"items":{
"name":"Child",
"type":"record",
"fields":[
{"name":"name", "type":"string"}
]
}
}
}
]
}
2. avsc파일 컴파일
C:tmpavro-tools>java -jar avro-tools-1.7.7.jar compile schema emp.avsc .
Input files to compile:
emp.avsc
* 컴파일 결과 생성파일 : C:tmpavro-toolsw3iicomemp.java
3. serializing/ deserializing
가. serializing
public void send(Emp event) {
EncoderFactory avroEncoderFactory = EncoderFactory.get();
SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);
try {
avroEventWriter.write(event, binaryEncoder);
binaryEncoder.flush();
} catch (IOException e) {
e.printStackTrace();
}
IOUtils.closeQuietly(stream);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
TOPIC, stream.toByteArray());
producer.send(data);
}
나. deserializing
ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);
SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);
4. producer 샘플소스
package com.gooper.icbms.sda.test.kafka;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import com.gooper.icbms.sda.comm.util.Utils;
import com.gooper.icbms.sda.test.kafka.avro.Emp;
public class AvroEmpEmitter {
public Producer<String, byte[]> producer;
private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();
/**
* broker주소
*/
private static String BROKER = "gsda1:9092,gsda2:9092,gsda3:9092";
public AvroEmpEmitter(String broker) {
Properties props = new Properties();
props.put("metadata.broker.list", broker);
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
props.put("request.required.acks", "1");
producer = new Producer<String, byte[]>(new ProducerConfig(props));
}
public void send(Emp event) {
EncoderFactory avroEncoderFactory = EncoderFactory.get();
SpecificDatumWriter<Emp> avroEventWriter = new SpecificDatumWriter<Emp>(Emp.SCHEMA$);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null);
try {
avroEventWriter.write(event, binaryEncoder);
binaryEncoder.flush();
} catch (IOException e) {
e.printStackTrace();
}
IOUtils.closeQuietly(stream);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>(
TOPIC, stream.toByteArray());
producer.send(data);
}
public static void main(String[] args) {
AvroEmpEmitter avroOneM2MEmitter = new AvroEmpEmitter(BROKER);
// Emp전송
System.out.println("Send start(Emp)......................");
avroOneM2MEmitter.send(buildEmp());
System.out.println("Send end(Emp)......................");
// 전송끝
avroOneM2MEmitter.close();
}
private void close() {
producer.close();
}
private static Emp buildEmp() {
Emp emp = new Emp();
emp.setId(1000);
emp.setName("이벤트명칭");
emp.setSalary(20);
emp.setAge(1);
emp.setAddress("여기는 주소입니다.");
return emp;
}
}
5. consumer 샘플소스
package com.gooper.icbms.sda.test.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import com.gooper.icbms.sda.comm.util.Utils;
import com.gooper.icbms.sda.test.kafka.avro.Emp;
public class AvroEmpSubscribe {
private static final String TOPIC = Utils.KafkaTopics.COL_EMP.toString();
private static final String ZOOKEEPER_CONNECTION = "gsda1:2181,gsda2:2181,gsda3:2181";
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect",ZOOKEEPER_CONNECTION);
properties.put("group.id","testgroup_11");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
//properties.put("auto.commit.enable", "false");
properties.put("auto.commit.enable", "true");
// 아래 값을 너무 짧게 설정하면 처리할 데이타가 많이 몰리는 경우 처리되지 않는 경우가 발생할 수 있음
// default값이 60000임
properties.put("auto.commit.interval.ms", "60000");
properties.put("fetch.message.max.bytes", "31457280"); // 30MB
properties.put("auto.offset.reset", "smallest");
//properties.put("auto.offset.reset", "largest"); // 최근것부터 처리
ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(TOPIC);
SpecificDatumReader<Emp> specificDatumReader = new SpecificDatumReader<Emp>(Emp.class);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext()) {
try {
MessageAndMetadata msg = consumerIte.next();
byte[] message = (byte[]) msg.message();
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(message, null);
Emp read = specificDatumReader.read(null, binaryDecoder);
System.out.println("Message from Topic("+TOPIC+") : " + read.toString());
System.out.println("Message(name) from Topic("+TOPIC+") : " + read.getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
if (consumer != null) consumer.shutdown();
}
}
* 참고 : https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema