메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


0. test-topic은 미리 생성해둔다.
(./bin/kafka-topics.sh --create --zookeeper gsda1:2181,gsda2:2181,gsda3:2181 --replication-factor 3 --partitions 3 --topic test-topic)

1. scala-ide용 eclipse에서 아래의 소스를 편집한다.

2. 해당 프로젝트의 console창에서 "sbt clean assemlby"를 실행하여 fat jar파일을 만든다.(파일명 : icbms-assembly-2.0.jar)

3. 서버에서 producer를 실행한다.(icbms.test.KafkaWordCountProducer)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCountProducer --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:7077,gsda2:7077 test-topic 1 1

4. 서버에서 consumer를 실행한다.(icbms.test.KafkaWordCount)
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 1


* 다양한 실행방법
    (icbms-assembly-2.0.jar은 "sbt assembly"명령으로 만들어지며, icbms_2.10-2.0.jar는 "sbt package"명령으로 만들어진다.)

가. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar,icbms_2.10-2.0.jar icbms_2.10-2.0.jar  gsda1:2181,gsda2:2181 testg-1 test-topic 3

나. yarn에서 실행(#1) : /svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master yarn --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar --files icbms_2.10-2.0.jar icbms_2.10-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

다. spark cluster에서 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master spark://gsda1:7077,sda2:7077 --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms-assembly-2.0.jar gsda1:2181,gsda2:2181 testg-1 test-topic 3

라. local모드로 실행
/svc/apps/sda/bin/hadoop/spark/bin/spark-submit --master local[2] --class icbms.test.KafkaWordCount --jars icbms-assembly-2.0.jar icbms_2.10-2.0.jar gsda1:2181,sda2:2181 testg-1 test-topic 3



-----------------scala소스 빌드용 설정파일(project.sbt) ---------------
import sbtassembly.AssemblyPlugin._

name := "icbms"

version := "2.0"

 //scalaVersion := "2.11.8"
scalaVersion := "2.10.4"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
.exclude("org.mortbay.jetty", "servlet-api").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-collections", "commons-collections").
    exclude("commons-logging", "commons-logging").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("com.codahale.metrics", "metrics-core")
,
"org.apache.spark" %% "spark-sql" % "1.3.1" ,
"org.apache.spark" % "spark-streaming_2.10" % "1.3.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.1" ,
"org.apache.kafka" % "kafka_2.10" % "0.9.0.1" ,
"org.apache.avro" % "avro" % "1.7.7" 
)

assemblyMergeStrategy in assembly := {
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
    case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last
    case PathList("com", "google", xs @ _*) => MergeStrategy.last
    case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
    case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
    case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
    case "about.html" => MergeStrategy.rename
    case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
    case "META-INF/mailcap" => MergeStrategy.last
    case "META-INF/mimetypes.default" => MergeStrategy.last
    case "plugin.properties" => MergeStrategy.last
    case "log4j.properties" => MergeStrategy.last
    case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
}

----------------------소스파일---------------
package icbms.test

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * Example:
 *    `$ bin/run-example
 *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
 *      my-consumer-group topic1,topic2 1`
 */
object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    //StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    //sparkConf.setMaster("spark://gsda1:7077,gsda2:7077")
    //sparkConf.setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

// Produces some random words between 1 and 100.
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}
번호 제목 날짜 조회 수
741 [Ranger]RangerAdminRESTClient Error gertting pplicies; Received NULL response!!, secureMode=true, user=rangerkms/node01.gooper.com@ GOOPER.COM (auth:KERBEROS), serviceName=cm_kms 2023.06.27 73
740 [vue storefrontui]외부 API통합하기 참고 문서 2022.02.09 80
739 [Encryption Zone]Encryption Zone에 생성된 table을 select할때 HDFS /tmp/zone1에 대한 권한이 없는 경우 2023.06.29 83
738 ./gradlew :composeDown 및 ./gradlew :composeUp 를 성공했을때의 메세지 2023.02.20 84
737 [EncryptionZone]User:testuser not allowed to do "DECRYPT_EEK" on 'testkey' 2023.06.29 89
736 [vi] test.nq파일에서 특정문자열(예, <>)을 찾아서 포함되는 라인을 삭제한 동일한 이름의 파일을 만드는 방법 2017.01.25 98
735 [Impala] alter table구문수행시 "WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://nameservice1/DATA/Temp/DB/source/table01_ccd'" 발생시 조치 2024.04.26 98
734 CM의 Impala->Query tab에서 FINISHED query가 보이지 않는 현상 2021.08.31 99
733 restaurant-controller,에서 등록 예시 2022.04.30 99
732 주문히스토리 조회 2022.04.30 99
731 [Hue metadata]Oracle에 있는 Hue 메타정보 테이블을 이용하여 coordinator와 workflow관계 목록을 추출하는 방법 2023.08.22 99
730 [Cloudera Agent] Metadata-Plugin throttling_logger INFO (713 skipped) Unable to send data to nav server. Will try again. 2022.05.16 103
729 oozie의 sqoop action수행시 ooize:launcher의 applicationId를 이용하여 oozie:action의 applicationId및 관련 로그를 찾는 방법 2023.07.26 104
728 [CDP7.1.6,HDFS]HDFS파일을 삭제하고 Trash비움이 완료된후에도 HDFS 공간을 차지하고 있는 경우 확인/조치 방법 2023.07.17 108
727 [CDP7.1.7, Replication]Encryption Zone내 HDFS파일을 비Encryption Zone으로 HDFS Replication시 User hdfs가 아닌 hadoop으로 수행하는 방법 2024.01.15 110
726 주문 생성 데이터 예시 2022.04.30 112
725 호출 url현황 2023.02.21 112
724 [CDP7.1.7, Hive Replication]Hive Replication진행중 "The following columns have types incompatible with the existing columns in their respective positions " 오류 2023.12.27 116
723 eclipse 3.1 단축키 정리파일 2017.01.02 118
722 [CDP7.1.7]Oozie job에서 ERROR: Kudu error(s) reported, first error: Timed out: Failed to write batch of 774 ops to tablet 8003f9a064bf4be5890a178439b2ba91가 발생하면서 쿼리가 실패하는 경우 2024.01.05 118
위로