메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


KafkaWordCount.scala를 컴파일하여 jar로 만들고 아래중 한가지 방법으로 Consumer를 실행시킬수 있다.
(test-topic은 kafka에 topic으로 생성되어 있어야 하며 group name은 testg-1로 했다)

* 참고1 : msg producer생성 프로그램 실행(별도의 console창에서 아래를 먼저 실행해준다)
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar sda1:7077,sda2:7077 test-topic 1 1

참고2 : icbms-assembly-2.0.jar는 KafkaWordCount와 관련 jar파일이 모두 포함된 uber jar파일이고
icbms_2.10-2.0.jar는 관련jar가 포함되지 않은 KafkaWordCount.scala를 compile하여 jar로 만든 파일이다.

------------방법1(--master를 yarn으로 지정하고 --jars 옵션에 ,를 이용하여 필요한 jar를 모두 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3

------------방법2(--master를 yarn으로 지정하고 --jars 옵션과 --files옵션을 이용하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3
 
------------방법3(--master를 local[2]로 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  sda1:2181,sda2:2181 testg-1 test-topic 3

------------방법4(--master를 spark 지정하고 --jars 옵션을 이용하여 uber jar만 지정하는 경우)----------
/svc/sda/bin/hadoop/spark/bin/spark-submit --master spark://sda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar  sda1:2181,sda2:2181,sda3:2181 testg-1 test-topic 3


----------------------------샘플소스(KafkaWordCount.scala)---------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 날짜 조회 수
741 [Ranger]RangerAdminRESTClient Error gertting pplicies; Received NULL response!!, secureMode=true, user=rangerkms/node01.gooper.com@ GOOPER.COM (auth:KERBEROS), serviceName=cm_kms 2023.06.27 73
740 [vue storefrontui]외부 API통합하기 참고 문서 2022.02.09 80
739 [Encryption Zone]Encryption Zone에 생성된 table을 select할때 HDFS /tmp/zone1에 대한 권한이 없는 경우 2023.06.29 83
738 ./gradlew :composeDown 및 ./gradlew :composeUp 를 성공했을때의 메세지 2023.02.20 84
737 [EncryptionZone]User:testuser not allowed to do "DECRYPT_EEK" on 'testkey' 2023.06.29 89
736 [vi] test.nq파일에서 특정문자열(예, <>)을 찾아서 포함되는 라인을 삭제한 동일한 이름의 파일을 만드는 방법 2017.01.25 98
735 [Impala] alter table구문수행시 "WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://nameservice1/DATA/Temp/DB/source/table01_ccd'" 발생시 조치 2024.04.26 98
734 CM의 Impala->Query tab에서 FINISHED query가 보이지 않는 현상 2021.08.31 99
733 restaurant-controller,에서 등록 예시 2022.04.30 99
732 주문히스토리 조회 2022.04.30 99
731 [Hue metadata]Oracle에 있는 Hue 메타정보 테이블을 이용하여 coordinator와 workflow관계 목록을 추출하는 방법 2023.08.22 99
730 [Cloudera Agent] Metadata-Plugin throttling_logger INFO (713 skipped) Unable to send data to nav server. Will try again. 2022.05.16 103
729 oozie의 sqoop action수행시 ooize:launcher의 applicationId를 이용하여 oozie:action의 applicationId및 관련 로그를 찾는 방법 2023.07.26 104
728 [CDP7.1.6,HDFS]HDFS파일을 삭제하고 Trash비움이 완료된후에도 HDFS 공간을 차지하고 있는 경우 확인/조치 방법 2023.07.17 108
727 [CDP7.1.7, Replication]Encryption Zone내 HDFS파일을 비Encryption Zone으로 HDFS Replication시 User hdfs가 아닌 hadoop으로 수행하는 방법 2024.01.15 110
726 주문 생성 데이터 예시 2022.04.30 112
725 호출 url현황 2023.02.21 112
724 [CDP7.1.7, Hive Replication]Hive Replication진행중 "The following columns have types incompatible with the existing columns in their respective positions " 오류 2023.12.27 116
723 eclipse 3.1 단축키 정리파일 2017.01.02 118
722 [CDP7.1.7]Oozie job에서 ERROR: Kudu error(s) reported, first error: Timed out: Failed to write batch of 774 ops to tablet 8003f9a064bf4be5890a178439b2ba91가 발생하면서 쿼리가 실패하는 경우 2024.01.05 118
위로