Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
1. 다운로드
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
2. upload및 압축풀기
가. tar xvfz kafka_2.11-0.9.0.1.tgz
나. ln -s kafka_2.11-0.9.0.1 kafka
3. properties파일 수정(zookeeper는 별도로 기동중인 상태임)
가. config/consumer.properties : zookeeper.connect=sda1:2181,sda2:2181,so1:2181
나. config/producter.properties : metadata.broker.list=sda1:9092,sda2:9092,so1:9092
다. config/server.properties를 broker개수 만큼 복사하고 아래의 내용을 수정한다.
(예, broker가 3개인경우 는server-1.properties, server-2.properties, server-3.properties의
이름을 각각 가지는 설정파일을 3개 만든다.)
- broker.id=1 : broker마다 unique한 값
- port=9092
- host.name=sda1 : 각 서버의 고정ip
- log.dirs=/logs/kafka/kafka-logs
- zookeeper.connect=sda1:2181,sda2:2181,so1:2181
4. broker로 사용할 서버에 각각 복사(sda1, sda2, so1)
scp -r -P 22 kafka_2.11-0.9.0.1 root@sda2:$HOME
5. broker노드로 사용할 각 서버에서 kafka server를 기동한다.(root로 각 서버에서 실행)
가. ln -s kafka_2.11-0.9.0.1 kafka
나. bin/kafka-server-start.sh config/server-1.properties & (해당 서버에서 실행)
(Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서
env JMX_PORT=8989 bin/kafka-server-start.sh config/server-1.properties &
와 같이 실행한다.)
나-1. bin/kafka-server-start.sh config/server-2.properties & (해당 서버에서 실행)
(Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서
env JMX_PORT=8989 bin/kafka-server-start.sh config/server-2.properties &
와 같이 실행한다.)
나-2. bin/kafka-server-start.sh config/server-3.properties & (해당 서버에서 실행)
(Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서
env JMX_PORT=8989 bin/kafka-server-start.sh config/server-3.properties &
와 같이 실행한다.)
* 서버중지 : bin/kafka-server-stop.sh
* 데몬확인(jps -m) : 81589 Kafka config/server-1.properties
6. topic관리(토픽명 : test-topic)
가. 생성
* bin/kafka-topics.sh --create --zookeeper sda1:2181,sda2:2181,so1:2181 --replication-factor 3 --partitions 3 --topic test-topic
나. 목록
* topic목록 확인 : bin/kafka-topics.sh --list --zookeeper sda1:2181,sda2:2181,so1:2181
다. topic정보
* topic정보 : bin/kafka-topics.sh --describe --zookeeper sda1:2181,sda2:2181,so1:2181
라. topic삭제
* topic삭제 : bin/kafka-topics.sh --delete --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COLLECT_ONEM2M
* 지정topic(예, COL_TEST)에 대한 replica와 partition정보 확인 :
./kafka-topics.sh --describe --zookeeper sda1:2181 --topic COL_TEST
마. topic설정 정보 변경
* topic 설정변경(partitions의 수를 5로 변경하는 경우) : bin/kafka-topics.sh --alter --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COL_TEST --partitions 5
바.topic 생성시 주의사항
존재하지 않은 topic에 대하여 메시지를 생산하거나 소비할 경우 broker의 설정값에 따라 디폴트 설정으로 topic을 자동생성될 수 있다. (자동생성하지 않도록 하려면 auto.create.topics.enable를 false로 설정한다)
- broker관련 기본적으로 설정해야 하는 값(Kafka API사용시 conf설정)
auto.create.topics.enable=true
num.partitions=1
default.replication.factor=1
delete.topic.enable=false
#30MB(전송가능량, Kafka API사용시 conf설정)
message.max.bytes=31457280
replica.fetch.max.bytes=31457280
8. test
가. producer기동
bin/kafka-console-producer.sh --broker-list sda1:9092,sda2:9092,so1:9092 --topic test-topic
나. consumer기동
bin/kafka-console-consumer.sh --zookeeper sda1:2181,sda2:2181,so1:2181 --from-beginning --topic test-topic
* producer의 console에서 텍스트 입력후 consumer에서 data가 보이면 정상적으로 작동되는것임
9. Kafka API를 이용하여 consumer프로그램 만들때 conf는 아래와 같이 설정해준다.
properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);
properties.put("group.id",user_id);
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000");
properties.put("fetch.message.max.bytes", "31457280"); // 30MB 전송가능함
properties.put("auto.offset.reset", "smallest");