메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


1. 다운로드

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz


2. upload및 압축풀기

  가. tar xvfz kafka_2.11-0.9.0.1.tgz

  나. ln -s kafka_2.11-0.9.0.1 kafka


3. properties파일 수정(zookeeper는 별도로 기동중인 상태임)

  가. config/consumer.properties : zookeeper.connect=sda1:2181,sda2:2181,so1:2181

  나. config/producter.properties : metadata.broker.list=sda1:9092,sda2:9092,so1:9092

  다. config/server.properties를 broker개수 만큼 복사하고 아래의 내용을 수정한다.

   (예, broker가 3개인경우 는server-1.properties, server-2.properties, server-3.properties의 

       이름을 각각 가지는 설정파일을 3개 만든다.)

    - broker.id=1 : broker마다 unique한 값

    - port=9092

    - host.name=sda1  : 각 서버의 고정ip

    - log.dirs=/logs/kafka/kafka-logs

    - zookeeper.connect=sda1:2181,sda2:2181,so1:2181


4. broker로 사용할 서버에 각각 복사(sda1, sda2, so1)

  scp -r -P 22 kafka_2.11-0.9.0.1 root@sda2:$HOME

  

5. broker노드로 사용할 각 서버에서 kafka server를 기동한다.(root로 각 서버에서 실행)

  가. ln -s kafka_2.11-0.9.0.1 kafka

  나. bin/kafka-server-start.sh config/server-1.properties &  (해당 서버에서 실행)

      (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-1.properties &

       와 같이 실행한다.)


  나-1. bin/kafka-server-start.sh config/server-2.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-2.properties &

       와 같이 실행한다.)

  나-2. bin/kafka-server-start.sh config/server-3.properties &  (해당 서버에서 실행)

       (Kafka Manager모니터링툴을 이용하기 위한 JMX PORT를 설정하기 위해서

        env JMX_PORT=8989 bin/kafka-server-start.sh config/server-3.properties &

       와 같이 실행한다.)


  * 서버중지 : bin/kafka-server-stop.sh

  * 데몬확인(jps -m) : 81589 Kafka config/server-1.properties

 

6. topic관리(토픽명 : test-topic)

 가. 생성

  * bin/kafka-topics.sh --create --zookeeper sda1:2181,sda2:2181,so1:2181 --replication-factor 3 --partitions 3 --topic test-topic


 나. 목록

 * topic목록 확인 : bin/kafka-topics.sh --list --zookeeper sda1:2181,sda2:2181,so1:2181


 다. topic정보

 * topic정보 : bin/kafka-topics.sh --describe --zookeeper sda1:2181,sda2:2181,so1:2181


 라. topic삭제

 * topic삭제 : bin/kafka-topics.sh --delete --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COLLECT_ONEM2M 

 * 지정topic(예, COL_TEST)에 대한 replica와 partition정보 확인 :

     ./kafka-topics.sh --describe  --zookeeper sda1:2181 --topic COL_TEST


 마. topic설정 정보 변경

 * topic 설정변경(partitions의 수를 5로 변경하는 경우) : bin/kafka-topics.sh --alter --zookeeper sda1:2181,sda2:2181,so1:2181 --topic COL_TEST --partitions 5


 바.topic 생성시 주의사항 

    존재하지 않은 topic에 대하여 메시지를 생산하거나 소비할 경우 broker의 설정값에 따라 디폴트 설정으로 topic을 자동생성될 수 있다. (자동생성하지 않도록 하려면 auto.create.topics.enable를 false로 설정한다)


 - broker관련 기본적으로 설정해야 하는 값(Kafka API사용시 conf설정)

   auto.create.topics.enable=true

   num.partitions=1   

   default.replication.factor=1

   delete.topic.enable=false


   #30MB(전송가능량, Kafka API사용시 conf설정)

   message.max.bytes=31457280

   replica.fetch.max.bytes=31457280


8. test

  가. producer기동

    bin/kafka-console-producer.sh --broker-list sda1:9092,sda2:9092,so1:9092 --topic test-topic

  나. consumer기동

    bin/kafka-console-consumer.sh --zookeeper sda1:2181,sda2:2181,so1:2181 --from-beginning --topic test-topic


  * producer의 console에서 텍스트 입력후 consumer에서 data가 보이면 정상적으로 작동되는것임


9. Kafka API를 이용하여 consumer프로그램 만들때 conf는 아래와 같이 설정해준다.

properties.put("zookeeper.connect",Utils.ZOOKEEPER_LIST);

properties.put("group.id",user_id);

properties.put("zookeeper.session.timeout.ms", "500");

properties.put("zookeeper.sync.time.ms", "250");

properties.put("auto.commit.interval.ms", "1000");

properties.put("fetch.message.max.bytes", "31457280"); // 30MB 전송가능함

properties.put("auto.offset.reset", "smallest");

번호 제목 날짜 조회 수
122 spark-submit으로 spark application실행하는 다양한 방법 2016.05.25 1106
121 spark 온라인 책자링크 (제목 : mastering-apache-spark) 2016.05.25 467
120 "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"오류 발생시 조치사항 2016.05.25 1632
119 spark-env.sh에서 사용할 수있는 항목. 2016.05.24 1243
118 Spark 1.6.1 설치후 HA구성 2016.05.24 1278
117 spark-shell실행시 "A read-only user or a user in a read-only database is not permitted to disable read-only mode on a connection."오류가 발생하는 경우 해결방법 2016.05.20 688
116 Master rejected startup because clock is out of sync 오류 해결방법 2016.05.03 252
115 kafka broker기동시 brokerId가 달라서 기동에 실패하는 경우 조치방법 2016.05.02 2488
» kafka 0.9.0.1 for scala 2.1.1 설치및 테스트 2016.05.02 1048
113 Spark 2.1.1 clustering(5대) 설치(YARN기반) 2016.04.22 2217
112 Hadoop 완벽 가이드 정리된 링크 2016.04.19 957
111 bin/cassandra -f -R로 startup할때 NullPointerException오류가 나면 조치할 내용 2016.04.14 459
110 Cassandra 3.4(3.10) 설치/설정 (5대로 clustering) 2016.04.11 997
109 Incompatible clusterIDs오류 원인및 해결방법 2016.04.01 661
108 namenode오류 복구시 사용하는 명령 2016.04.01 591
107 "java.net.NoRouteToHostException: 호스트로 갈 루트가 없음" 오류시 확인및 조치할 사항 2016.04.01 3451
106 CentOS의 서버 5대에 yarn(hadoop 2.7.2)설치하기-ResourceManager HA/HDFS HA포함, JobHistory포함 2016.03.29 1260
105 Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기 2016.03.11 496
104 CDH 5.4.4 버전에서 hive on tez (0.7.0)설치하기 2016.01.14 800
103 mongodb에서 큰데이타 sort시 오류발생에 대한 해결방법 2015.12.22 1009
위로