메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 날짜 조회 수
261 servlet-api를 jar형태로 build할때 포함하지 말고 java 설치 위치의 jre/lib/ext에 복사하여 사용하는것이 좋다. 2016.08.10 607
260 [Elephas] Jena Elephas를 이용하여 Spark에서 rdfTriples의 RDD를 만들고 RDD관련 작업하는 샘플소스 2016.08.10 531
259 로컬의 라이브러리파일들을 dependency에 포함시키는 방법 2016.08.09 643
258 gradle을 이용하여 jar파일 생성시 provided속성을 지정할 수 있게 설정하는 방법 2016.08.09 601
257 [SBT] assembly시 "[error] deduplicate: different file contents found in the following:"오류 발생시 조치사항 2016.08.04 1036
256 [SBT] SBT 사용법 정리(링크) 2016.08.04 1033
255 [SBT] project.sbt에 libraryDependencies에 필요한 jar를 지정했으나 sbt compile할때 클래스를 못찾는 오류가 발생했을때 조치사항 2016.08.03 1148
254 build할때 unmappable character for encoding MS949 에러 발생시 조치사항 2016.08.03 888
» kafkaWordCount.scala의 producer와 consumer 클래스를 이용하여 kafka를 이용한 word count 테스트 하기 2016.08.02 585
252 bin/start-hbase.sh실행시 org.apache.hadoop.hbase.util.FileSystemVersionException: HBase file layout needs to be upgraded오류가 발생하면 조치사항 2016.08.01 535
251 start-all.sh로 spark데몬 기동시 "JAVA_HOME is not set"오류 발생시 조치사항 2016.08.01 1145
250 hadoop클러스터를 구성하던 서버중 HA를 담당하는 서버의 hostname등이 변경되어 문제가 발생했을때 조치사항 2016.07.29 447
249 Journal Storage Directory /data/hadoop/journal/data/mycluster not formatted 오류시 조치사항 2016.07.29 1699
248 슬라이딩 윈도우 예제 2016.07.28 275
247 거침없이 배우는 Drools 책의 샘플소스 file 2016.07.22 1496
246 drools를 이용한 로그,rule matching등의 테스트 java프로그램 file 2016.07.21 293
245 ServerInfo객체파일 2016.07.21 1138
244 drools에서 drl관련 로그를 기록하기 위한 클래스 파일 2016.07.21 869
243 워킹 메모리에 대한 정보를 처리하는 클래스 파일 2016.07.21 953
242 커리 변경 이벤트를 처리하기 위한 구현클래스 2016.07.21 862
위로